preface

Version 1.4 nacOS uses the Http short connection + long polling mode. The client initiates an Http request, and the server holds the request and responds to the client when the configuration is changed. The timeout time is 30s.

port Port offset type implementation describe
8848 0 HTTP SpringBoot Console access, cluster communication, client communication.
7848 – 1000. gRPC JRaftServer JRaft service.

Version 2.0 nacOS replaces HTTP short connection long polling with gRPC long connection. Configuration synchronization is implemented in a push-pull mode.

  • Pull: The client registers the listener with the server once in a while and updates the local configuration of the client if the configuration changes.

  • Push: When a configuration is changed, the server notifies the client listening on that configuration, and the client registers the listener again and updates the configuration.

port Port offset type implementation describe
8848 0 HTTP SpringBoot Console access.
7848 – 1000. gRPC JRaftServer JRaft service.
9848 1000 gRPC GrpcSdkServer Process Nacos client requests.
9849 1001 gRPC GrpcClusterServer Nacos Server cluster communication.

1.4 Long Polling

Recalling version 1.4, when the ClientWorker was constructed, two threading services were started

  • A threaded service for detecting and submitting LongPollingRunnable long polling tasks (2.0 deprecated)

When the ClientWorker is constructed, a scheduled task is started and the checkConfigInfo method is executed every 10ms. CheckConfigInfo Checks the current number of CacheData and whether to enable a long polling task. If the current number of long polling tasks is < math. ceil(cacheMap size / 3000), a new long polling task is started. In the case of few configuration files, at most one long polling task.

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
     // ...
    // Inspect and submit LongPollingRunnable to this. ExecutorService
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run(a) {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
}
Copy the code
  • The LongPollingRunnable long polling task initiates a 30-second long polling task to the server. When the server detects configuration changes, it pushes them to the client. (2.0 deprecated)

2.0 Client listening configuration

After 2.0, configuration listening no longer uses HTTP short connection long polling mode, but uses long connection.

When ClientWorker is constructed, a ConfigRpcTransportClient is created, a thread service is injected, and its start method is executed.

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
        final Properties properties) throws NacosException {
    this.configFilterChainManager = configFilterChainManager;
    init(properties);
    agent = new ConfigRpcTransportClient(properties, serverListManager);
    ScheduledExecutorService executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker");
                    t.setDaemon(true);
                    returnt; }}); agent.setExecutor(executorService); agent.start(); }Copy the code

The start method of the ConfigRpcTransportClient runs a configuration synchronization task in an infinite loop using an external thread service.

// **ConfigRpcTransportClient**
@Override
public void startInternal(a) throws NacosException {
    executor.schedule(new Runnable() {
        @Override
        public void run(a) {
            while (true) {
                try {
                    // Wait for wake up, or wait for 5s
                    listenExecutebell.poll(5L, TimeUnit.SECONDS);
                    // Configure listening
                    executeConfigListen();
                } catch (Exception e) {
                    LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); }}}},0L, TimeUnit.MILLISECONDS);
}
Copy the code

Let’s take a look at the ConfigRpcTransportClient properties.

public class ConfigRpcTransportClient extends ConfigTransportClient {
    private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<Object>(1);
    private Object bellItem = new Object();
    private long lastAllSyncTime = System.currentTimeMillis();
}
Copy the code
  • BlockingQueue listenExecutebell: a BlockingQueue of capacity 1 that wakes up the configuration synchronization task blocking while the element is produced.
  • Object bellItem: A plain Object used to put into a blocking queue.
  • Long lastAllSyncTime: Timestamp of the last full synchronization.

1. CacheMap groups

Let’s look at the processing of this configuration synchronization task. The executeConfigListen method is executed when the blocking queue is put into an element or 5s times out.

// ClientWorker.ConfigRpcTransportClient
/**
* groupKey -> cacheData.
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
            new HashMap<String, CacheData>());
@Override
public void executeConfigListen(a) {
    // taskId -cachedata (listener)
    Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
    // taskId -cachedata (no listener)
    Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
    long now = System.currentTimeMillis();
    // Current time - If the time since the last full synchronization is greater than = 5 minutes, full synchronization needs to be performed
    boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
    for (CacheData cache : cacheMap.get().values()) {
        synchronized (cache) {
            if (cache.isSyncWithServer()) {
                // 1. For configurations that have been synchronized, verify md5 consistency between cacheData and listener again
                cache.checkListenerMd5();
                // If the time since the last full synchronization is less than 5 minutes, no configuration is synchronized
                if(! needAllSync) {continue; }}// The listener is configured
            if(! CollectionUtils.isEmpty(cache.getListeners())) {if(! cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {
                        cacheDatas = newLinkedList<CacheData>(); listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); }}// The listener is not configured
            else if (CollectionUtils.isEmpty(cache.getListeners())) {
                if(! cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {
                        cacheDatas = newLinkedList<CacheData>(); removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); }}}}boolean hasChangedKeys = false;
    // 2. Process cacheData with a Listener and initiate a Listen request
    if(! listenCachesMap.isEmpty()) {// ...
    }
    // 3. Remove the listener
    if(! removeListenCachesMap.isEmpty()) {// ...
    }
    if (needAllSync) {
        lastAllSyncTime = now;
    }
    // 4. If the configuration is changed, executeConfigListen is triggered again immediately
    if(hasChangedKeys) { notifyListenConfig(); }}Copy the code

CacheData in cacheMap is divided into two maps, one with a Listener and one without a Listener, depending on whether a Listener exists. For the former need to register the listener, for the latter need to remove the listener. (cacheMap is by the configService. AddListener user registration, see chapter one)

// ClientWorker.ConfigRpcTransportClient#executeConfigListen
// taskId -cachedata (listener)
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
// taskId -cachedata (no listener)
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
Copy the code

In addition, the amount of synchronization (needAllSync = false), 5 minutes CacheData. IsSyncWithServer = true, this part CacheData is not involved in the registered monitoring logic, not counted in the two maps, In this case, only the consistency between cacheData. md5 and listener. md5 is checked and the Listener is triggered.

// Current time - If the time since the last full synchronization is greater than = 5 minutes, full synchronization needs to be performed
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
  synchronized (cache) {
    if (cache.isSyncWithServer()) {
      // 1. For configurations that have been synchronized, verify md5 consistency between cacheData and listener again
      cache.checkListenerMd5();
      // If the time since the last full synchronization is less than 5 minutes, no configuration is synchronized
      if(! needAllSync) {continue; }}// ...
  }
  // ...
Copy the code

IsSyncWithServer indicates server.md5== client.cacheData. md5==client.listeners. Md5 indicates that the configuration of CacheData has been synchronized. That is, after a full synchronization of five minutes, all CacheData listeners are re-registered.

2. Register to listen to configuration synchronization

For listenCachesMap, call the gRPC interface to register the listener.

// ClientWorker.ConfigRpcTransportClient#executeConfigListen
// 2. Process cacheData with a Listener and initiate a Listen request
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
    String taskId = entry.getKey();
    List<CacheData> listenCaches = entry.getValue();

    ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
    configChangeListenRequest.setListen(true);
    try {
        // Each taskId corresponds to an RpcClient #1
        RpcClient rpcClient = ensureRpcClient(taskId);
        // gRPC registers listener #2
        ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
                rpcClient, configChangeListenRequest);
        // gRPC returns result processing #3
        if(configChangeBatchListenResponse ! =null && configChangeBatchListenResponse.isSuccess()) {
            Set<String> changeKeys = new HashSet<String>();
            if(! CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { hasChangedKeys =true;
                for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                        .getChangedConfigs()) {
                    String changeKey = GroupKey
                            .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
                                    changeConfig.getTenant());
                    changeKeys.add(changeKey);
                    boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
                    // Query the latest configuration, drop snapshot, and refresh the configuration in CacheData
                    // For uninitialized configurations, notify all listeners
                    refreshContentAndCheck(changeKey, !isInitializing);
                }

            }
            // Set syncWithServer=true for no configuration changes to reduce the overhead of synchronous configuration
            for (CacheData cacheData : listenCaches) {
              String groupKey = GroupKey
                .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
              if(! changeKeys.contains(groupKey)) {synchronized (cacheData) {
                  if(! cacheData.getListeners().isEmpty()) { cacheData.setSyncWithServer(true);
                    continue; }}}// Sets all cacheData to uninitialized
              cacheData.setInitializing(false); }}}catch (Exception e) {
            //ignore}}Copy the code

One ensureRpcClient method that ensures each taskId has one RpcClient, while 1.4 is one LongPollingRunnable task per taskId.

// ClientWorker.ConfigRpcTransportClient#ensureRpcClient
// Each taskId corresponds to an RpcClient #1
private synchronized RpcClient ensureRpcClient(String taskId) throws NacosException {
    Map<String, String> labels = getLabels();
    Map<String, String> newLabels = new HashMap<String, String>(labels);
    newLabels.put("taskId", taskId);
    // One taskId corresponds to one RpcClient
    // Each RpcClient has its own thread pool
    RpcClient rpcClient = RpcClientFactory
            .createClient("config-" + taskId + "-" + uuid, getConnectionType(), newLabels);
    if (rpcClient.isWaitInitiated()) {
        initRpcClientHandler(rpcClient);
        rpcClient.setTenant(getTenant());
        rpcClient.clientAbilities(initAbilities());
        rpcClient.start();
    }
    return rpcClient;
}
// RpcClientFactory#createClient
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
  String clientNameInner = clientName;
  synchronized (clientMap) {
    if (clientMap.get(clientNameInner) == null) {
      RpcClient moduleClient = null;
      if (ConnectionType.GRPC.equals(connectionType)) {
        moduleClient = new GrpcSdkClient(clientNameInner);
      }
      moduleClient.labels(labels);
      clientMap.put(clientNameInner, moduleClient);
      return moduleClient;
    }
    returnclientMap.get(clientNameInner); }}Copy the code

The Nacos Server port 9848 is then requested to register the listener. Instead of the 1.4Server holding the request, it will return immediately.

// gRPC registers listener #2
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
Copy the code

Server-side ConfigChangeBatchListenResponse returns md5 have changed configuration items, refreshContentAndCheck method will query the server configuration, latest and update the snapshot files and CacheData. Here getServerConfig is the same as 1.4, but instead of gRPC; CheckListenerMd5, like 1.4, compares MD5 in CacheData to MD5 in the Listener and triggers a Listener if it changes.

// gRPC returns result processing #3
private void refreshContentAndCheck(String groupKey, boolean notify) {
    if(cacheMap.get() ! =null&& cacheMap.get().containsKey(groupKey)) { CacheData cache = cacheMap.get().get(groupKey); refreshContentAndCheck(cache, notify); }}// Refresh the configuration in CacheData and notify the listener
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
    try {
        String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
        cacheData.setContent(ct[0]);
        if (null! = ct[1]) {
            cacheData.setType(ct[1]);
        }
        cacheData.checkListenerMd5();
    } catch(Exception e) { LOGGER.error(); }}Copy the code

Since the blocking queue will time out after 5 seconds, the executeConfigListen method will be triggered again. To reduce the overhead of frequently configuring full synchronization, CacheData that does not make a difference between server and client configurations is marked isSyncWithServer=true. Indicates that the configurations of the client and server are consistent. You do not need to perform full synchronization within 5 minutes.

3. Remove listening

For CacheData with no listeners, the same gRPC interface is invoked to deregister the listener, except that the listen parameter in ConfigBatchListenRequest is false to deregister the listener.

// ClientWorker.ConfigRpcTransportClient#executeConfigListen
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
    String taskId = entry.getKey();
    List<CacheData> removeListenCaches = entry.getValue();
    ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
    configChangeListenRequest.setListen(false);
    try {
        RpcClient rpcClient = ensureRpcClient(taskId);
        boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
        if (removeSuccess) {
            for (CacheData cacheData : removeListenCaches) {
                synchronized (cacheData) {
                    if (cacheData.getListeners().isEmpty()) {
                        ClientWorker.this
                                .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
                    }
                }
            }
        }

    } catch (Exception e) {
        LOGGER.error("async remove listen config change error ", e); }}Copy the code

4. Wake up the listening task

You can wake up the configuration synchronization task by putting elements into the blocking queue listenExecutebell.

// ClientWorker.ConfigRpcTransportClient#notifyListenConfig
@Override
public void notifyListenConfig(a) {
    listenExecutebell.offer(bellItem);
}
Copy the code

The configuration synchronization task is woken up when there are elements in the listenExecutebell blocking queue. So when does the configuration synchronization task wake up?

Scenario 1: configuration synchronization task execution, the server to monitor response ConfigChangeBatchListenResponse contained in the change of configuration.

// ClientWorker.ConfigRpcTransportClient#executeConfigListen
// 4. If the configuration is changed, executeConfigListen is triggered again immediately
public void executeConfigListen(a) {
    // ...
    if(hasChangedKeys) { notifyListenConfig(); }}Copy the code

Scenario 2: When adding a Listener, you need to synchronize the configuration immediately. Note that the entire CacheData is also marked with syncWithServer=false to enforce configuration synchronization to ensure data consistency on both ends.

// ClientWorker
public void addListeners(String dataId, String group, List<? extends Listener> listeners) {
    group = null2defaultGroup(group);
    CacheData cache = addCacheDataIfAbsent(dataId, group);
    synchronized (cache) {
        for (Listener listener : listeners) {
            cache.addListener(listener);
        }
        cache.setSyncWithServer(false); agent.notifyListenConfig(); }}Copy the code

Scenario 3: the server sends ConfigChangeNotifyRequest request, said a configuration change, need to perform client configuration synchronization.

// ClientWorker.ConfigRpcTransportClient#initRpcClientHandler
private void initRpcClientHandler(final RpcClient rpcClientInner) {
    rpcClientInner.registerServerRequestHandler((request) -> {
        if (request instanceof ConfigChangeNotifyRequest) {
            ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
            String groupKey = GroupKey
                    .getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
                            configChangeNotifyRequest.getTenant());

            CacheData cacheData = cacheMap.get().get(groupKey);
            if(cacheData ! =null) {
                cacheData.setSyncWithServer(false);
                notifyListenConfig();
            }
            return new ConfigChangeNotifyResponse();
        }
        return null;
    });
}
Copy the code

Other scenarios are listed, but the configuration synchronization task is awakened when the client needs to be aware of configuration changes.

The 2.0 server handles listening requests

The service side ConfigChangeBatchListenRequestHandler handle ConfigBatchListenRequest.

@Component
public class ConfigChangeBatchListenRequestHandler extends RequestHandler<ConfigBatchListenRequest.ConfigChangeBatchListenResponse> {
    @Autowired
    private ConfigChangeListenContext configChangeListenContext;

    @Override
    @TpsControl(pointName = "ConfigListen")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public ConfigChangeBatchListenResponse handle(ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta) throws NacosException {
        String connectionId = StringPool.get(meta.getConnectionId());
        String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG);

        ConfigChangeBatchListenResponse configChangeBatchListenResponse = new ConfigChangeBatchListenResponse();
        for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest.getConfigListenContexts()) {
            String groupKey = GroupKey2.getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
            groupKey = StringPool.get(groupKey);

            String md5 = StringPool.get(listenContext.getMd5());

            if (configChangeListenRequest.isListen()) {
                // Save the connectionId -> key and key -> MD5 relationships on the server
                configChangeListenContext.addListen(groupKey, md5, connectionId);
                // Verify whether the configuration has been changed. If so, add the changed groupKey to the response packet
                boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
                if (!isUptoDate) {
                    configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
                }
            } else{ configChangeListenContext.removeListen(groupKey, connectionId); }}returnconfigChangeBatchListenResponse; }}Copy the code

The request for registration to monitor, long connection (connectionId) < – > groupKey < – > client md5 mapping relationship, and groupKey < – > save the mapping relationship to ConfigChangeListenContext connectionId collection.

The former mapping is for console display only;

The latter mapping is mainly used to find subscribing clients to be notified of configuration items that change later.

@Component
public class ConfigChangeListenContext {
    /** * groupKey-> connection set. */
    private ConcurrentHashMap<String, HashSet<String>> groupKeyContext = new ConcurrentHashMap<String, HashSet<String>>();
    
    /** * connectionId-> group key set. */
    private ConcurrentHashMap<String, HashMap<String, String>> connectionIdContext = new ConcurrentHashMap<String, HashMap<String, String>>();
    public synchronized void addListen(String groupKey, String md5, String connectionId) {
        // 1.add groupKeyContext
        Set<String> listenClients = groupKeyContext.get(groupKey);
        if (listenClients == null) {
            groupKeyContext.putIfAbsent(groupKey, new HashSet<String>());
            listenClients = groupKeyContext.get(groupKey);
        }
        listenClients.add(connectionId);
        
        // 2.add connectionIdContext
        HashMap<String, String> groupKeys = connectionIdContext.get(connectionId);
        if (groupKeys == null) {
            connectionIdContext.putIfAbsent(connectionId, new HashMap<String, String>(16)); groupKeys = connectionIdContext.get(connectionId); } groupKeys.put(groupKey, md5); }}Copy the code

In addition, similar to 1.4 Long Polling logic, if the MD5 of the client is inconsistent with the MD5 of the server, the server returns the inconsistent groupKey.

// ConfigChangeBatchListenRequestHandler.handle
// Verify whether the configuration has been changed. If so, add the changed groupKey to the response packet
boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
if(! isUptoDate) { configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant()); }Copy the code

Monitoring, to remove the service side is only removed ConfigChangeListenContext above the mapping relationship.

4. 2.0 server configuration publishing

After the local and memory configurations of the Nacos server are updated, the LocalDataChangeEvent event is published.

In version 1.4, the server handles LocalDataChangeEvent through LongPollingService.

The 2.0 Http long polling logic is still in place and has not been removed, but the LocalDataChangeEvent event handler on the 2.0 client is RpcConfigChangeNotifier.

@Component(value = "rpcConfigChangeNotifier")
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
    public RpcConfigChangeNotifier(a) {
        NotifyCenter.registerSubscriber(this);
    }
    @Override
    public void onEvent(LocalDataChangeEvent event) {
        String groupKey = event.groupKey;
        boolean isBeta = event.isBeta;
        List<String> betaIps = event.betaIps;
        String[] strings = GroupKey.parseKey(groupKey);
        String dataId = strings[0];
        String group = strings[1];
        String tenant = strings.length > 2 ? strings[2] : ""; String tag = event.tag; configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag); }}Copy the code

ConfigDataChanged obtained from the monitoring context ConfigChangeListenContext groupKey corresponding all listening connectionId, Then use the ConnectionManager to obtain the Connection gRPC long Connection corresponding to the connectionId. Finally build ConfigChangeNotifyRequest synchronization configuration request parameters, submit RpcPushTask to other threads processing service, don’t block the other event processing.

// RpcConfigChangeNotifier
public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
        List<String> betaIps, String tag) {
    // From the context of the registered listener, get all listener connectionids corresponding to the groupKey
    Set<String> listeners = configChangeListenContext.getListeners(groupKey);
    if(! CollectionUtils.isEmpty(listeners)) {for (final String client : listeners) {
            // Obtain the actual LONG gRPC connection using connectionId
            Connection connection = connectionManager.getConnection(client);
            if (connection == null) {
                continue;
            }
            // ...
            // Build synchronous configuration request parameters
            ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
            // To avoid blocking other event processing, submit a task to another thread pool for processing
            RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, connection.getMetaInfo().getAppName()); push(rpcPushRetryTask); }}}Copy the code

The push method is divided into three branches.

If the Task has been retried more than 50 times, ConnectionManager closes the long connection.

If the connectionId in ConnectionManager still exists and the Task is submitted normally, the delay compensation will be performed after each Task execution failure. The delay time is equal to the number of failures x 2 seconds.

If the connection does not exist, no action is taken.

// RpcConfigChangeNotifier
private void push(RpcPushTask retryTask) {
    ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
    if (retryTask.isOverTimes()) { // Retry more than 50 times
        connectionManager.unregister(retryTask.connectionId);
        return;
    } else if(connectionManager.getConnection(retryTask.connectionId) ! =null) {
        ConfigExecutor.getClientConfigNotifierServiceExecutor()
                .schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
    } else {
        // client is already offline,ingnore task.}}Copy the code

RpcPushTask is an internal class of RpcConfigChangeNotifier, where the run method handles fault tolerant logic.

  • If the traffic is limited, resubmit the Task
  • If the RPC fails, resubmit the Task
class RpcPushTask implements Runnable {
    @Override
    public void run(a) {
        tryTimes++;
        // If the traffic is limited, resubmit the Task
        if(! tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) { push(this);
        } else {
            / / gRPC request the client sends ConfigChangeNotifyRequest
            rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
                @Override
                public void onSuccess(a) {
                    tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
                }
                @Override
                public void onFail(Throwable e) {
                    tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
                    // If it fails, resubmit the Task
                    push(RpcPushTask.this); } }, ConfigExecutor.getClientConfigNotifierServiceExecutor()); }}}Copy the code

For the client logic, see Scenario 3 in 2.4 Waking up a Listening Task.

conclusion

2. The main change in the X configuration center is to introduce long connections instead of long polling for short connections.

Client changes:

Change Point 1:

1. Every 3000 CacheData, the client starts a LongPollingRunnable polling task. 2. X Every 3000 CacheData, the client enables one RpcClient. Each RpcClient establishes a long connection with the server.

Change 2:

Added the logic of scheduled full pull configuration on the client.

In 1.x, the Nacos configuration center updates the client configuration through a long polling mode, with only configuration push for the client.

X supports periodic client synchronization. Therefore, 2. X combines push and pull.

Pull: Every five minutes, the client sends a ConfigBatchListenRequest (ConfigBatchListenRequest) request to full CacheData. If the MD5 configuration is changed, the client receives the changed configuration item and sends a ConfigQuery request to query the real-time configuration.

Push: server configuration changes, will send ConfigChangeNotifyRequest request giving long connection to the current node client notice item configuration changes.

Server side changes:

Change Point 1:

Because 2.x uses long connections instead of long polling, the ConfigBatchListenRequest will not be held by the server and will be returned immediately. The server simply keeps the listening relationship in memory for subsequent notification.

The mapping between groupKey and connectionId helps you find the client long connection by changing configuration items. The mapping between connectionId and groupKey is for console display only. These relationships are stored in the server ConfigChangeListenContext single case.

Change 2:

For change point 1, 1.x needs to find the client AsyncContext that is still conducting long polling through the groupKey. 2. X is found through groupKey connectionId, then find long connection, through connectionId ConfigChangeNotifyRequest notice sending the client configuration changes.