sequence

This article focuses on the PushReceiver of nacOS Client

PushReceiver

Nacos – 1.1.3 / client/SRC/main/Java/com/alibaba/nacos/client/naming/core/PushReceiver. Java

public class PushReceiver implements Runnable {

    private ScheduledExecutorService executorService;

    private static final int UDP_MSS = 64 * 1024;

    private DatagramSocket udpSocket;

    private HostReactor hostReactor;

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            udpSocket = new DatagramSocket();

            executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    returnthread; }}); executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + \ \ "}";
                } else if ("dump".equals(pushPacket.type)) {
                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\" "
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {
                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + \ \ "}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

    public static class PushPacket {
        public String type;
        public long lastRefTime;
        public String data;
    }

    public int getUDPPort() {
        returnudpSocket.getLocalPort(); }}Copy the code
  • PushReceiver implements the Runnable interface, its constructor creates udpSocket and ScheduledThreadPoolExecutor, then to ScheduledThreadPoolExecutor register yourself
  • Its run method uses a while true loop to execute udpsocket.receive (packet), which then parses the received data to PushPacket, and does different processing according to pushpacket.type
  • When pushPacket. The type of the dom or service is called hostReactor. ProcessServiceJSON (pushPacket. Data); When pushPacket. Type to dump will hostReactor. GetServiceInfoMap () serialization to ack, finally will return to back an ack

HostReactor

Nacos – 1.1.3 / client/SRC/main/Java/com/alibaba/nacos/client/naming/core/HostReactor. Java

public class HostReactor { private static final long DEFAULT_DELAY = 1000L; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final Map<String, ScheduledFuture<? >> futureMap = new HashMap<String, ScheduledFuture<? > > (); private Map<String, ServiceInfo> serviceInfoMap; private Map<String, Object> updatingMap; private PushReceiver pushReceiver; private EventDispatcher eventDispatcher; private NamingProxy serverProxy; private FailoverReactor failoverReactor; private String cacheDir; private ScheduledExecutorService executor; public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) { this(eventDispatcher, serverProxy, cacheDir,false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
    }

    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
                       boolean loadCacheAtStart, int pollingThreadCount) {

        executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                returnthread; }}); this.eventDispatcher = eventDispatcher; this.serverProxy = serverProxy; this.cacheDir = cacheDir;if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
        } else{ this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16); } this.updatingMap = new ConcurrentHashMap<String, Object>(); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); } / /... public ServiceInfo processServiceJSON(String json) { ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if(serviceInfo.getHosts() == null || ! serviceInfo.validate()) { //empty or error push, just ignorereturn oldService;
        }

        boolean changed = false;

        if(oldService ! = null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()
                    + ", new-t: " + serviceInfo.getLastRefTime());
            }

            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

            Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
                oldHostMap.put(host.toInetAddr(), host);
            }

            Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
                newHostMap.put(host.toInetAddr(), host);
            }

            Set<Instance> modHosts = new HashSet<Instance>();
            Set<Instance> newHosts = new HashSet<Instance>();
            Set<Instance> remvHosts = new HashSet<Instance>();

            List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                newHostMap.entrySet());
            for (Map.Entry<String, Instance> entry : newServiceHosts) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if(oldHostMap.containsKey(key) && ! StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host);continue;
                }

                if (!oldHostMap.containsKey(key)) {
                    newHosts.add(host);
                }
            }

            for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
                    continue;
                }

                if (!newHostMap.containsKey(key)) {
                    remvHosts.add(host);
                }

            }

            if (newHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "
                    + serviceInfo.getKey() + "- >" + JSON.toJSONString(newHosts));
            }

            if (remvHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "
                    + serviceInfo.getKey() + "- >" + JSON.toJSONString(remvHosts));
            }

            if (modHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
                    + serviceInfo.getKey() + "- >" + JSON.toJSONString(modHosts));
            }

            serviceInfo.setJsonFromServer(json);

            if(newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); }}else {
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + "- >" + JSON
                .toJSONString(serviceInfo.getHosts()));
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            eventDispatcher.serviceChanged(serviceInfo);
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }

        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

        if (changed) {
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +
                "- >" + JSON.toJSONString(serviceInfo.getHosts()));
        }

        return serviceInfo;
    }

    public Map<String, ServiceInfo> getServiceInfoMap() {
        returnserviceInfoMap; } / /... }Copy the code
  • The processServiceJSON method will compare the received serviceInfo to the local one and determine if it changed, And when you need to update the local serviceInfo and callback eventDispatcher. ServiceChanged (serviceInfo) and DiskCache. Write (serviceInfo cacheDir); The HostReactor constructor has a loadCacheAtStart argument (The default is false), if true, serviceInfoMap is initialized by reading serviceInfo from a local file using diskCache.read (this.cachedir)

DiskCache

Nacos – 1.1.3 / client/SRC/main/Java/com/alibaba/nacos/client/naming/cache/DiskCache. Java

public class DiskCache {

    public static void write(ServiceInfo dom, String dir) {

        try {
            makeSureCacheDirExists(dir);


            File file = new File(dir, dom.getKeyEncoded());
            if(! file.exists()) { // add another ! file.exists() to avoid conflicted creating-new-file from multi-instancesif(! file.createNewFile() && ! file.exists()) { throw new IllegalStateException("failed to create cache file");
                }
            }

            StringBuilder keyContentBuffer = new StringBuilder("");

            String json = dom.getJsonFromServer();

            if (StringUtils.isEmpty(json)) {
                json = JSON.toJSONString(dom);
            }

            keyContentBuffer.append(json);

            //Use the concurrent API to ensure the consistency.
            ConcurrentDiskUtil.writeFileContent(file, keyContentBuffer.toString(), Charset.defaultCharset().toString());

        } catch (Throwable e) {
            NAMING_LOGGER.error("[NA] failed to write cache for dom:" + dom.getName(), e);
        }
    }

    public static String getLineSeparator() {
        return System.getProperty("line.separator");
    }

    public static Map<String, ServiceInfo> read(String cacheDir) {
        Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

        BufferedReader reader = null;
        try {
            File[] files = makeSureCacheDirExists(cacheDir).listFiles();
            if (files == null || files.length == 0) {
                return domMap;
            }

            for (File file : files) {
                if(! file.isFile()) {continue;
                }

                String fileName = URLDecoder.decode(file.getName(), "UTF-8");

                if(! (fileName.endsWith(Constants.SERVICE_INFO_SPLITER +"meta") || fileName.endsWith(
                    Constants.SERVICE_INFO_SPLITER + "special-url"))) {
                    ServiceInfo dom = new ServiceInfo(fileName);
                    List<Instance> ips = new ArrayList<Instance>();
                    dom.setHosts(ips);

                    ServiceInfo newFormat = null;

                    try {
                        String dataString = ConcurrentDiskUtil.getFileContent(file,
                            Charset.defaultCharset().toString());
                        reader = new BufferedReader(new StringReader(dataString));

                        String json;
                        while((json = reader.readLine()) ! = null) { try {if(! json.startsWith("{")) {
                                    continue;
                                }

                                newFormat = JSON.parseObject(json, ServiceInfo.class);

                                if (StringUtils.isEmpty(newFormat.getName())) {
                                    ips.add(JSON.parseObject(json, Instance.class));
                                }
                            } catch (Throwable e) {
                                NAMING_LOGGER.error("[NA] error while parsing cache file: " + json, e);
                            }
                        }
                    } catch (Exception e) {
                        NAMING_LOGGER.error("[NA] failed to read cache for dom: " + file.getName(), e);
                    } finally {
                        try {
                            if(reader ! = null) { reader.close(); } } catch (Exception e) { //ignore } }if(newFormat ! = null && ! StringUtils.isEmpty(newFormat.getName()) && ! CollectionUtils.isEmpty( newFormat.getHosts())) { domMap.put(dom.getKey(), newFormat); }else if(! CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }

        return domMap;
    }

    private static File makeSureCacheDirExists(String dir) {
        File cacheDir = new File(dir);
        if(! cacheDir.exists() && ! cacheDir.mkdirs()) { throw new IllegalStateException("failed to create cache dir: " + dir);
        }

        returncacheDir; }}Copy the code
  • DiskCache write method will serviceInfo written to dir under the folder, file called serviceInfo. GetKeyEncoded (); The read method reads the files in the dir folder, parses them into ServiceInfo, puts them in domMap, and returns domMap

summary

  • PushReceiver implements the Runnable interface, its constructor creates udpSocket and ScheduledThreadPoolExecutor, then to ScheduledThreadPoolExecutor register yourself
  • Its run method uses a while true loop to execute udpsocket.receive (packet), which then parses the received data to PushPacket, and does different processing according to pushpacket.type
  • When pushPacket. The type of the dom or service is called hostReactor. ProcessServiceJSON (pushPacket. Data); When pushPacket. Type to dump will hostReactor. GetServiceInfoMap () serialization to ack, finally will return to back an ack

doc

  • PushReceiver