
In this paper, we study the nacos DistroConsistencyServiceImpl


Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/consistency/ConsistencyService. Java

public interface ConsistencyService {

     * Put a data related to a key to Nacos cluster
     * @param key   key of data, this key should be globally unique
     * @param value value of data
     * @throws NacosException
     * @see
    void put(String key, Record value) throws NacosException;

     * Remove a data from Nacos cluster
     * @param key key of data
     * @throws NacosException
    void remove(String key) throws NacosException;

     * Get a data from Nacos cluster
     * @param key key of data
     * @return data related to the key
     * @throws NacosException
    Datum get(String key) throws NacosException;

     * Listen for changes of a data
     * @param key      key of data
     * @param listener callback of data change
     * @throws NacosException
    void listen(String key, RecordListener listener) throws NacosException;

     * Cancel listening of a data
     * @param key      key of data
     * @param listener callback of data change
     * @throws NacosException
    void unlisten(String key, RecordListener listener) throws NacosException;

     * Tell the status of this consistency service
     * @return true if available
    boolean isAvailable();

  • The ConsistencyService defines the put, remove, Get, listen, unlisten, and isAvailable methods


Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService. Java

public interface EphemeralConsistencyService extends ConsistencyService {

  • EphemeralConsistencyService interface inherits the ConsistencyService interface


Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl Java

public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);


            returnt; }}); @Autowired private DistroMapper distroMapper; @Autowired private DataStore dataStore; @Autowired private TaskDispatcher taskDispatcher; @Autowired private DataSyncer dataSyncer; @Autowired private Serializer serializer; @Autowired private ServerListManager serverListManager; @Autowired private SwitchDomain switchDomain; @Autowired private GlobalConfig globalConfig; private boolean initialized =false;

    public volatile Notifier notifier = new Notifier();

    private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

    private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);

    public void init() {
        GlobalExecutor.submit(new Runnable() {
            public void run() {
                try {
                } catch (Exception e) {
                    Loggers.DISTRO.error("load data failed.", e); }}}); executor.submit(notifier); } public void load() throws Exception {if (SystemUtils.STANDALONE_MODE) {
            initialized = true;
        // size = 1 means only myself in the list, we need at least one another server alive:
        while (serverListManager.getHealthyServers().size() <= 1) {
  "waiting server list init...");

        for (Server server : serverListManager.getHealthyServers()) {
            if (NetUtils.localServer().equals(server.getKey())) {
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("sync from " + server);
            // try sync data from remote server:
            if (syncAllDataFromRemote(server)) {
                initialized = true;
                return; }}} / /... public boolean syncAllDataFromRemote(Server server) { try { byte[] data = NamingProxy.getAllData(server.getKey()); processData(data);return true;
        } catch (Exception e) {
            Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
            return false;

    public void processData(byte[] data) throws Exception {
        if (data.length > 0) {
            Map<String, Datum<Instances>> datumMap =
                serializer.deserializeMap(data, Instances.class);

            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                dataStore.put(entry.getKey(), entry.getValue());

                if(! listeners.containsKey(entry.getKey())) { // pretty sure the service not exist:if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
              "creating service {}", entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        // now validate the service. iffailed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); }}}for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {

                if(! listeners.containsKey(entry.getKey())) { // Should not happen: Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());

                try {
                    for (RecordListener listener : listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(), entry.getValue().value);
                } catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);

                // Update data store iflistener executed successfully: dataStore.put(entry.getKey(), entry.getValue()); }}} / /... @Override public void put(String key, Record value) throws NacosException { onPut(key, value); taskDispatcher.addTask(key); } @Override public void remove(String key) throws NacosException { onRemove(key); listeners.remove(key); } @Override public Datum get(String key) throws NacosException {returndataStore.get(key); } / /... @Override public void listen(String key, RecordListener listener) throws NacosException {if(! listeners.containsKey(key)) { listeners.put(key, new CopyOnWriteArrayList<>()); }if (listeners.get(key).contains(listener)) {


    public void unlisten(String key, RecordListener listener) throws NacosException {
        if(! listeners.containsKey(key)) {return;
        for (RecordListener recordListener : listeners.get(key)) {
            if (recordListener.equals(listener)) {

    public boolean isAvailable() {
        returnisInitialized() ||; } / /... }Copy the code
  • DistroConsistencyServiceImpl EphemeralConsistencyService interface is realized
  • Its init method asynchronously executes the load method, which initializes syncAllDataFromRemote, gets data from NamingProxy.getallData, and then executes processData, It basically performs a callback and then adds data to the dataStore; The init method ends up executing Notifier asynchronously
  • Its PUT method executes onPut and taskDispatcher.addTask(key); Remove methods execute onRemove methods called listeners. Remove (key); Its GET method reads directly from the dataStore; Listen adds a RecordListener; Unlisten removes the RecordListener; The isAvailable value can be determined by the isInitialized and serverStatus. UP status


Nacos – 1.1.3 / naming/SRC/main/Java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl Java

    public class Notifier implements Runnable {

        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

        public void addTask(String datumKey, ApplyAction action) {

            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
            if (action == ApplyAction.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            tasks.add(Pair.with(datumKey, action));

        public int getTaskSize() {
            return tasks.size();

        public void run() {
  "distro notifier started");

            while (true) {
                try {

                    Pair pair = tasks.take();

                    if (pair == null) {

                    String datumKey = (String) pair.getValue0();
                    ApplyAction action = (ApplyAction) pair.getValue1();


                    int count = 0;

                    if(! listeners.containsKey(datumKey)) {continue;

                    for (RecordListener listener : listeners.get(datumKey)) {


                        try {
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);

                            if (action == ApplyAction.DELETE) {
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); }}if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count,;
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}}}Copy the code
  • Notifier implements the Runnable interface, whose Run method takes tasks from LinkedBlockingQueue and executes listener callbacks one by one


