The original link: www.liaochuntao.cn/2019/09/16/…

Early reading

Nacos DistroConsistencyServiceImpl in sc

Distro: Distro, THE AP protocol used by Alibaba in Nacos

Core code implementation

Nacos Naming module started data synchronization

DistroConsistencyServiceImpl

public void load(a) throws Exception {
  if (SystemUtils.STANDALONE_MODE) {
    initialized = true;
    return;
  }
  // size = 1 means only myself in the list, we need at least one another server alive:
  // In cluster mode, you need to wait for at least two nodes before the logic can proceed
  while (serverListManager.getHealthyServers().size() <= 1) {
    Thread.sleep(1000L);
    Loggers.DISTRO.info("waiting server list init...");
  }

  // Get all healthy cluster nodes
  for (Server server : serverListManager.getHealthyServers()) {
    // Do not need to perform data synchronization broadcast operation
    if (NetUtils.localServer().equals(server.getKey())) {
      continue;
    }
    if (Loggers.DISTRO.isDebugEnabled()) {
      Loggers.DISTRO.debug("sync from " + server);
    }
    // Perform a full data pull from another server only once, leaving the rest to incremental synchronization
    if (syncAllDataFromRemote(server)) {
      initialized = true;
      return; }}}Copy the code
The action of pulling full data

The action of the data pull performer

public boolean syncAllDataFromRemote(Server server) {
  try {
    // Get data
    byte[] data = NamingProxy.getAllData(server.getKey());
    // The received data is processed
    processData(data);
    return true;
  } catch (Exception e) {
    Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
    return false; }}Copy the code

Response from the data provider

@RequestMapping(value = "/datums", method = RequestMethod.GET)
public ResponseEntity getAllDatums(HttpServletRequest request, HttpServletResponse response) throws Exception {
  // Serialize the stored data container Map directly
  String content = new String(serializer.serialize(dataStore.getDataMap()), StandardCharsets.UTF_8);
  return ResponseEntity.ok(content);
}
Copy the code

Next, what happens when a full amount of data is pulled from a Server Node

public void processData(byte[] data) throws Exception {
        if (data.length > 0) {
            // Deserialize the data first
            Map<String, Datum<Instances>> datumMap =
                serializer.deserializeMap(data, Instances.class);

            // Perform traversal on the data
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                // The data is stored in the DataStore
                dataStore.put(entry.getKey(), entry.getValue());
                // Check whether the listener contains a listener for this Key, if not, it is a new data
                if(! listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:
                    if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
                        Loggers.DISTRO.info("creating service {}", entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        service.setName(serviceName);
                        service.setNamespaceId(namespaceId);
                        service.setGroupName(Constants.DEFAULT_GROUP);
                        // now validate the service. if failed, exception will be thrown
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        service.recalculateChecksum();
                        // Call back the Listener to tell the new Service data
                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); }}}// Perform a Listener callback
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                if(! listeners.containsKey(entry.getKey())) {// Should not happen:
                    Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                    continue;
                }

                try {
                    for(RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); }}catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                    continue;
                }

                // Update data store if listener executed successfully:dataStore.put(entry.getKey(), entry.getValue()); }}}Copy the code

The next step is the incremental synchronization of data. First, I will introduce a Distro protocol concept — authoritative Server

The judge of the authoritative Server

public class DistroMapper implements ServerChangeListener {

    private List<String> healthyList = new ArrayList<>();

    public List<String> getHealthyList(a) {
        return healthyList;
    }

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private ServerListManager serverListManager;

    /** * init server list */
    @PostConstruct
    public void init(a) {
        serverListManager.listen(this);
    }

    // Determine whether the data can be responded to by this node
    public boolean responsible(Cluster cluster, Instance instance) {
        returnswitchDomain.isHealthCheckEnabled(cluster.getServiceName()) && ! cluster.getHealthCheckTask().isCancelled() && responsible(cluster.getServiceName()) && cluster.contains(instance); }// Perform Hash computationbased on the ServiceName to find the index of the corresponding authoritative node and check whether it is the current node. If yes, the data can be processed by the current node
    public boolean responsible(String serviceName) {
        if(! switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {return true;
        }

        if (CollectionUtils.isEmpty(healthyList)) {
            // means distro config is not ready yet
            return false;
        }

        int index = healthyList.indexOf(NetUtils.localServer());
        int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
        if (lastIndex < 0 || index < 0) {
            return true;
        }

        int target = distroHash(serviceName) % healthyList.size();
        return target >= index && target <= lastIndex;
    }

    // Find the address of the authoritative Server according to ServiceName
    public String mapSrv(String serviceName) {
        if(CollectionUtils.isEmpty(healthyList) || ! switchDomain.isDistroEnabled()) {return NetUtils.localServer();
        }

        try {
            return healthyList.get(distroHash(serviceName) % healthyList.size());
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("distro mapper failed, return localhost: " + NetUtils.localServer(), e);

            returnNetUtils.localServer(); }}public int distroHash(String serviceName) {
        return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
    }

    @Override
    public void onChangeServerList(List<Server> latestMembers) {}@Override
    public void onChangeHealthyServerList(List<Server> latestReachableMembers) {

        List<String> newHealthyList = new ArrayList<>();
        for(Server server : latestReachableMembers) { newHealthyList.add(server.getKey()); } healthyList = newHealthyList; }}Copy the code

The component above, Distro, is a key part of the Distro protocol, which hashes the data to find the authoritative nodes in the cluster node list

Incremental data synchronization between nodes
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init(a) {
        // Build the task executor
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            // Task scheduler commitGlobalExecutor.submitTaskDispatch(taskScheduler); }}public void addTask(String key) {
        // Find a TaskScheduler based on the Hash Key for task submission
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex(a) {
            return index;
        }

        @Override
        public void run(a) {
            List<String> keys = new ArrayList<>();
            while (true) {
                try {
                    // Retrieve a task from the task cache queue (with a timeout setting)
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);
                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }
                    // If there is no cluster or the cluster node is empty
                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }
                    if (StringUtils.isBlank(key)) {
                        continue;
                    }
                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }
                    // Do a temporary storage operation to avoid batch task processing
                    keys.add(key);
                    dataSize++;
                    // If the temporary task quantity reaches the specified batch, or the task time reaches the maximum setting, start the data synchronization task
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
                        // Create a SyncTask task for each server
                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());
                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }
                            // The task is submitted and the task execution delay is set to immediate
                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0; }}catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}
Copy the code
The actual performer of the DataSyncer data synchronization task
public class DataSyncer {...@PostConstruct
    public void init(a) {
        // Perform periodic data synchronization tasks every five seconds.
        startTimedSync();
    }

    // Task submission
    public void submit(SyncTask task, long delay) {
        // If it's a new task:
        if (task.getRetryCount() == 0) {
            // Iterate over all task keys
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                // Add data tasks to the Map to avoid repeated submission of data synchronization tasks
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process, key: {}", key);
                    }
                    // If the task already exists, remove the Key of the taskiterator.remove(); }}}// If all tasks have been removed, end the task submission
        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }
        // Asynchronous tasks perform data synchronization
        GlobalExecutor.submitDataSync(() -> {
            // 1. check the server
            if (getServers() == null || getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }
            // Obtain the actual synchronization data of the data synchronization task
            List<String> keys = task.getKeys();
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }
            // 2. get the datums by keys and check the datum is empty or not
            // Obtain batch data by key
            Map<String, Datum> datumMap = dataStore.batchGet(keys);
            // If the data has been removed, cancel the task
            if (datumMap == null || datumMap.isEmpty()) {
                // clear all flags of this task:
                for (String key : keys) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
                return;
            }
            // Data serialization
            byte[] data = serializer.serialize(datumMap);
            long timestamp = System.currentTimeMillis();
            // Perform incremental data synchronization and commit to other nodes
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            // If the data synchronization task fails, create the SyncTask again and set the retry times
            if(! success) { SyncTask syncTask =new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                retrySync(syncTask);
            } else {
                // clear all flags of this task:
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }

    // Task retry
    public void retrySync(SyncTask syncTask) {
        Server server = new Server();
        server.setIp(syncTask.getTargetServer().split(":") [0]);
        server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":") [1]));
        if(! getServers().contains(server)) {// if server is no longer in healthy server list, ignore this task:
            return;
        }
        // TODO may choose other retry policy.
        // Automatically delays the next execution of the retry task
        submit(syncTask, partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync(a) {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    // Execute periodic tasks
    // Broadcast the data to other Server nodes each time
    public class TimedSync implements Runnable {

        @Override
        public void run(a) {
            try {
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", getServers());
                }
                // send local timestamps to other servers:
                Map<String, String> keyChecksums = new HashMap<>(64);
                // For the data storage container
                for (String key : dataStore.keys()) {
                    // If you are not the authoritative Server responsible for this data, you have no right to broadcast this data between clusters
                    if(! distroMapper.responsible(KeyBuilder.getServiceName(key))) {continue;
                    }
                    // Get the data.
                    Datum datum = dataStore.get(key);
                    if (datum == null) {
                        continue;
                    }
                    // Add the data broadcast list
                    keyChecksums.put(key, datum.value.getChecksum());
                }
                if (keyChecksums.isEmpty()) {
                    return;
                }
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                }
                // Perform data broadcast operations on all nodes in the cluster except itself
                for (Server member : getServers()) {
                    if (NetUtils.localServer().equals(member.getKey())) {
                        continue;
                    }
                    // Data broadcast between clustersNamingProxy.syncCheckSums(keyChecksums, member.getKey()); }}catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", e); }}}public List<Server> getServers(a) {
        return serverListManager.getHealthyServers();
    }

    public String buildKey(String key, String targetServer) {
        returnkey + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer; }}Copy the code

What does the other node do after receiving the data

@RequestMapping(value = "/checksum", method = RequestMethod.PUT)
public ResponseEntity syncChecksum(HttpServletRequest request, HttpServletResponse response) throws Exception {
    // The data transferred from that node
    String source = WebUtils.required(request, "source");
    String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
    // Data serialization
    Map<String, String> dataMap = serializer.deserialize(entity.getBytes(), new TypeReference<Map<String, String>>() {});
    // Data receive operation
    consistencyService.onReceiveChecksums(dataMap, source);
    return ResponseEntity.ok("ok");
}
Copy the code
public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
    if (syncChecksumTasks.containsKey(server)) {
        // Already in process of this server:
        Loggers.DISTRO.warn("sync checksum task already in process with {}", server);
        return;
    }
    // Indicates that data from the current Server is being processed
    syncChecksumTasks.put(server, "1");
    try {
        // The key to be updated
        List<String> toUpdateKeys = new ArrayList<>();
        // The Key to delete
        List<String> toRemoveKeys = new ArrayList<>();
        // Perform a traversal of the incoming data
        for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
            // If the transmitted data exists in the data of this node, exit the data synchronization operation directly (violating the setting requirements of the authoritative server).
            if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
                // this key should not be sent from remote server:
                Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
                // abort the procedure:
                return;
            }
            // If the data does not exist in the current data store container or the checksum value is different, the data is updated
            if(! dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value ==null || 
            !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
                toUpdateKeys.add(entry.getKey());
            }
        }
        // Directly traverses all data in the data storage container
        for (String key : dataStore.keys()) {
            // If the data is not the source server's responsibility, skip it
            if(! server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {continue;
            }
            // If the synchronized data does not contain the key, the key needs to be deleted
            if (!checksumMap.containsKey(key)) {
                toRemoveKeys.add(key);
            }
        }
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
        }
        // Perform the data flash operation
        for (String key : toRemoveKeys) {
            onRemove(key);
        }
        if (toUpdateKeys.isEmpty()) {
            return;
        }
        try {
            // Pull the data according to the key to be updated, then perform operations on the synchronized data, and the rest is as done in the original full data synchronization
            byte[] result = NamingProxy.getData(toUpdateKeys, server);
            processData(result);
        } catch (Exception e) {
            Loggers.DISTRO.error("get data from " + server + " failed!", e);
        }
    finally {
        // Remove this 'in process' flag:
        // Remove the data synchronization task id of the source serversyncChecksumTasks.remove(server); }}Copy the code