Nacos serves as the configuration center. When applications access Nacos to dynamically obtain configuration sources, they are cached in local memory and disk. Since Nacos is a dynamic configuration center, subsequent configuration changes need to be made aware of and updated to local memory by all relevant clients! So where is this functionality implemented? And how does it implement configuration updates? Let’s explore the implementation of the source code!

The client configures cache updates

When the client receives the configuration, it needs to refresh dynamically to ensure that the data is consistent with the server. How does this process work? We will do a detailed analysis in this section.

Nacos uses the long rotation mechanism to realize the synchronization of data changes, the principle is as follows!

The overall working process is as follows:

  • The client sends a long round training request
  • After receiving the request, the server compares the data in the cache of the server for consistency. If no, the server returns the data directly
  • If they are the same, the comparison is performed after 29.5s delay by schedule
  • In order to ensure that the server can timely notify the client when data changes occur within 29.5s, the server uses the event subscription method to monitor the event of local data changes on the server. Once the event is received, the notification of DataChangeTask will be triggered. After traversing the allStubs queue ClientLongPolling, write the result back to the client, and complete a data push
  • What if after the DataChangeTask task has “pushed” the data, the scheduling task in ClientLongPolling starts executing again? It is easy to cancel the scheduled task before the “push” operation. This prevents the scheduled task from writing the response data after the push operation, which will definitely report an error. So, the first step in the ClientLongPolling method is to delete the subscription event

Long rotation mission start entrance

In the constructor of NacosConfigService, something happens when the class is instantiated

  • The actual working class is ServerHttpAgent. MetricsHttpAgent also calls the ServerHttpAgent method inside MetricsHttpAgent, adding monitoring statistics information
  • ClientWorker: a working class of the client. The agent is passed in as a parameter to the ClientWorker. You can basically guess that the agent will be used to do some remote related things
public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties); //
    this.configFilterChainManager = new ConfigFilterChainManager(properties);
    // Initialize the network communication component
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start(); 
    // Initialize ClientWorker
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
Copy the code

ClientWorker

In the initialization code above, we focus on the ClientWorker class, which has the following constructor

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager; // Initialize the filter manager
    
    // Initialize the timeout parameter
    
    init(properties); // Initialize the configuration
    
    // Initialize a timed thread pool, overriding the threadFactory method
    this.executor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            returnt; }});// Initialize a timed thread pool. The long rotation should be with the nacOS server
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    returnt; }});// Set the execution frequency of the scheduled task and call the checkConfigInfo method to check whether the configuration has changed
        // The first execution delay is 1 ms and the first execution delay is 10 ms
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run(a) {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
}
Copy the code

As you can see, in addition to maintaining HttpAgent internally, the ClientWorker creates two thread pools:

  1. The first thread pool is an executor that has only one thread to execute a scheduled task. The executor executes the checkConfigInfo() method every 10ms.

  2. The second thread pool is a normal thread pool that, as you can see from the name ThreadFactory, does long polling.

checkConfigInfo

During ClientWorker construction initialization, a scheduled task is initiated to execute the checkConfigInfo() method, which periodically checks for changes to the local configuration and configuration on the server, as defined below.

public void checkConfigInfo(a) {
    // Dispatch tasks.
    int listenerSize = cacheMap.size(); //
    // Round up the longingTaskCount.
     // The number of monitored configurations is divided by 3000 to obtain an integer representing the number of long round training tasks
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
     / / currentLongingTaskCount said the current number of long training in rotation task, if less than the results of calculation, you can continue to create
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(newLongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; }}Copy the code

The main purpose of this method is to check whether the configuration information of the server has changed. If there is a change, a listener notification is triggered

  • CacheMap: AtomicReference

    > cacheMap stores the cache set that listens for changes. Key is the value concatenated according to dataID/ Group/Tenant. Value is the contents of the corresponding configuration file stored on the NACOS server.

  • By default, each LongPullingRunnable task handles 3000 listening configuration sets by default. If it exceeds 3000, multiple LongPollingRunnable needs to be started to execute.

  • CurrentLongingTaskCount Saves the number of started LongPullingRunnable tasks

  • ExecutorService is the thread pool that is initialized in the ClientWorker constructor

LongPollingRunnable.run

The implementation logic of LongPollingRunnable long rotation training task has a long code, so we will analyze it in sections.

The first part has two main pieces of logic

  1. Classify tasks by batch
  2. Checks for consistency between the current batch’s cache and the local file’s data, and triggers a listen if it changes.
class LongPollingRunnable implements Runnable {
    
    private final int taskId; // Indicates the batch ID of the current task
    
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    
    @Override
    public void run(a) {
        
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // Iterate over CacheMap and save the cache of CacheMap with the same task ID to cacheDatas
            // Through the checkLocalConfig method
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) { // The listener needs to be notified that the data has changed
                            cacheData.checkListenerMd5(); // Notify all listeners that have been set up for the current configuration}}catch (Exception e) {
                        LOGGER.error("get local config info error", e); }}}// omit part
            
        } catch (Throwable e) {
            
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); // An exception occurs and the task will be executed again after the next taskPenaltyTime}}}Copy the code

checkLocalConfig

Check the local configuration. There are three cases

  • If isUseLocalConfigInfo is false, the local configuration is not used, but the file in the local cache path exists, so set isUseLocalConfigInfo to true and update the contents of cacheData and the update time of the file
  • If isUseLocalConfigInfo is true, the local configuration file is used, but the local cache file does not exist, then set to false and do not notify the listener.
  • If isUseLocalConfigInfo is true and a local cache file exists, but the cached time is inconsistent with the file update time, the contents of cacheData are updated and isUseLocalConfigInfo is set to true.
private void checkLocalConfig(CacheData cacheData) {
    final String dataId = cacheData.dataId;
    final String group = cacheData.group;
    final String tenant = cacheData.tenant;
    File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
    // No -> yes
    if(! cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        
        LOGGER.warn(
                "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
                agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
        return;
    }
     // Yes -> no. Do not notify the business listener when it gets the configuration from the server.
    // If use local config info, then it doesn't notify business listener and notify after getting from server.
    if(cacheData.isUseLocalConfigInfo() && ! path.exists()) { cacheData.setUseLocalConfigInfo(false);
        LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
                dataId, group, tenant);
        return;
    }
    
     / / is subject to change
    if(cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() ! = path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
        cacheData.setUseLocalConfigInfo(true);
        cacheData.setLocalConfigInfoVersion(path.lastModified());
        cacheData.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cacheData.setEncryptedDataKey(encryptedDataKey);
        LOGGER.warn(
                "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); }}Copy the code

checkListenerMd5

The listeners added by the user are iterated, and if the MD5 values of the data are found to be different, a notification is sent

void checkListenerMd5(a) {
    for (ManagerListenerWrap wrap : listeners) {
        if(! md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); }}}Copy the code

Check the server configuration

In LongPollingRunnable. Run, the local configuration is read and checked to determine whether the data changes, so as to realize the change notification

The current thread then needs to go to the remote server to get the latest data and check what has changed

  • Get the dataid of the data change on the remote server by checkUpdateDataIds
  • Iterate through the collection of changes and call getServerConfig to get the corresponding content from the remote server
  • Update the local cache to the content returned by the server
  • The cacheDatas is finally traversed to find changed data to notify
// check server config
// Get the List of dataids for the changed data from the server in the List
      
        collection
      
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if(! CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
 
// Iterate over the configuration items that have changed
for (String groupKey : changedGroupKeys) {
    String[] key = GroupKey.parseKey(groupKey);
    String dataId = key[0];
    String group = key[1];
    String tenant = null;
    if (key.length == 3) {
        tenant = key[2];
    }
    try {
        // Obtain configuration information item by item based on these configuration items
        ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);
        // Save configuration information to CacheData
        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
        cache.setContent(response.getContent());
        cache.setEncryptedDataKey(response.getEncryptedDataKey());
        if (null! = response.getConfigType()) { cache.setType(response.getConfigType()); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(response.getContent()), response.getConfigType());
    } catch (NacosException ioe) {
        String message = String
            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); }}// Iterate through the CacheData collection to find data that has changed for notification
for (CacheData cacheData : cacheDatas) {
    if(! cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false);
    }
}
inInitializingCacheList.clear();
 // Continue to pass the current thread for polling
executorService.execute(this);
Copy the code

checkUpdateDataIds

This method sends a check request to the server to check whether the local configuration is consistent with that on the server.

  • Start by finding the cache with isUseLocalConfigInfo false from the cacheDatas collection
  • Concatenate the configuration items to be checked into a string and call checkUpdateConfigStr for verification
/** * Get the list of DataID values changed from Server. Only dataId and Group of the returned objects are valid. Ensure that NULL is not returned. * /
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) { // Concatenate the configuration items to be checked into a string
        if(! cacheData.isUseLocalConfigInfo()) {// Find the cache for isUseLocalConfigInfo=false
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {//
                // cacheData appears for the first time in cacheMap & first check updateinInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); }}}booleanisInitializingCacheList = ! inInitializingCacheList.isEmpty();return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

Copy the code

checkUpdateConfigStr

Get a list of dataids from the Server whose values have changed. Only dataId and Group of the returned objects are valid. Ensure that NULL is not returned.

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    
    // Concatenate parameters with headers
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    headers.put("Long-Pulling-Timeout"."" + timeout);
    
    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup"."true");
    }
    
    if (StringUtils.isBlank(probeUpdateString)) {// Determine whether the string that can be changed is empty, if so, return directly.
        return Collections.emptyList();
    }
    
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        // Set readTimeoutMs, which is the timeout time for this request to wait for a response. Default is 30s
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        // Initiate a remote call
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        
        if (result.ok()) { // If the response succeeds
            setHealthServer(true);
            return parseUpdateDataIdResponse(result.getData()); // Parse and update the data and return the string tenant/group/ datAID that did change the data.
        } else {// If the response fails
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.getCode()); }}catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}
Copy the code

Summary of client cache configuration long rotation mechanism

The core of the overall implementation is the following

  1. Split tasks for local cache configuration, each batch is 3000
  2. Create one thread for every 3000 to execute
  3. The cache of each batch is first compared to the data in the local disk file,
    1. If it is inconsistent with the local configuration, the cache is updated and the client listener is notified
    2. If the local cache and disk data are consistent, you need to send a remote request to check the configuration changes
  4. Tenent /groupId/dataId is used to concatenate the string and send it to the server for checking and return the changed configuration
  5. After receiving the list of changed configurations, the client iterates the list item by item and sends it to the server to obtain the configuration content.

Server configuration update push

After analyzing the client, out of curiosity, how does the server handle requests from the client? So again, we need to think about a couple of things

  • How does the server implement the long rotation mechanism
  • Why is the client timeout set to 30s

The client sends a request to /v1/cs/configs/listener.

//# ConfigController.java
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED".true);
    String probeModify = request.getParameter("Listening-Configs");
    if (StringUtils.isBlank(probeModify)) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    
    probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
    
    Map<String, String> clientMd5Map;
    try {
        // Parse the configuration items that may change from the client into a Map set (key=dataId,value= MD5)
        clientMd5Map = MD5Util.getClientMd5Map(probeModify);
    } catch (Throwable e) {
        throw new IllegalArgumentException("invalid probeModify");
    }
    
    // Start executive rotation.
    inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
Copy the code

doPollingConfig

This method is mainly used to do long rotation training and short polling judgment

  1. For a long rotation, use the addLongPollingClient method
  2. In short polling, the data on the server is directly compared. If md5 inconsistency exists, the data is directly returned.
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
        Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
    
    // Determine whether the current request supports long rotation training. (a)
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    
    // For short polling, go to the following request, which compares the data sent from the client to the server item by item and saves it in changeGroups.
    // Compatible with short polling logic.
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    
    // Compatible with short polling result.
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    
    // Before 2.0.4 version, return value is put into header.
    if (versionNum < START_LONG_POLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    
    Loggers.AUTH.info("new content:" + newResult);
    
    // Disable cache.
    response.setHeader("Pragma"."no-cache");
    response.setDateHeader("Expires".0);
    response.setHeader("Cache-Control"."no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}
Copy the code

addLongPollingClient

Save the client’s request to the execution engine of the long rotation.

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
        int probeRequestSize) {
    // Obtain the timeout period of the client long rotation training
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); 
    // Disallow the break flag
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    // Application name
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    //
    String tag = req.getHeader("Vipserver-Tag");
    // delay time, default is 500ms
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

    // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
    // Return a response 500ms ahead of time to avoid client timeout
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // Do nothing but set fix polling timeout.
    } else {
        long start = System.currentTimeMillis();
        // Use MD5 to check whether the key received by the client is inconsistent with the key received by the server. If yes, save the key to changedGroups.
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) { // If a change is found, the request is returned directly to the client
            generateResponse(req, rsp, changedGroups);
            LogUtil.CLIENT_LOG.info(| | | "{} {} {} {} | | {} {} | {}", System.currentTimeMillis() - start, "instant",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return;
        } else if(noHangUpFlag ! =null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { // If noHangUpFlag is true, the client does not need to be suspended, so return directly.
            LogUtil.CLIENT_LOG.info(| | | "{} {} {} {} | | {} {} | {}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
            return; }}// Get the IP address of the requester
    String ip = RequestUtil.getRemoteIp(req);

    // Must be called by http thread, or send response.
    // Convert the current request to an asynchronous request (which means that the Tomcat thread is released, i.e. the client request needs to be manually triggered to return via asyncContext, otherwise it will be suspended)
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout() is incorrect, Control by oneself
    asyncContext.setTimeout(0L); // Set asynchronous request timeout,
    // CEO rotation training request
    ConfigExecutor.executeLongPolling(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
Copy the code

ClientLongPolling

Let’s take a look at what clientLongPolling actually does. Or can we guess what we should be doing

  • This task will block for 29.5 seconds to execute, since it doesn’t make sense to execute it immediately, since it has already been executed once before
  • If the data changes within 29.5s+, advance notice is required. There needs to be a monitoring mechanism

Based on these assumptions, we can see how it works

From the coarse-grained code, its implementation seems to be consistent with our guess. In the run method, a scheduled task is implemented through scheduler.schedule, and its delay time is exactly 29.5s as previously calculated. In this task, MD5Util.compareMd5 is used for calculation

As for the other one, when the data changes, you can’t wait until 29.5 seconds before the notification. We found out there was an allSubs thing, and it seemed to have something to do with publishing subscriptions. Is it possible that the current clientLongPolling subscribed to events that changed the data?

class ClientLongPolling implements Runnable {

    @Override
    public void run(a) {
        // Build an asynchronous task and delay execution 29.5s
        asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
            @Override
            public void run(a) { // If 29.5s is reached, no configuration changes are made during this period, and the execution is automatically triggered
                try {
                    getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());

                    // Delete subsciber's relations.
                    allSubs.remove(ClientLongPolling.this); // Remove the subscription

                    if (isFixedPolling()) { // If it is a long rotation training with fixed intervals
                        LogUtil.CLIENT_LOG
                                .info(| | | "{} {} {} {} | | {} {}", (System.currentTimeMillis() - createTime), "fix",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        // Compare the changed keys
                        List<String> changedGroups = MD5Util
                                .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                        (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                        if (changedGroups.size() > 0) {// If the value is greater than 0, it indicates that there is a change
                            sendResponse(changedGroups);
                        } else {
                            sendResponse(null); // Otherwise null is returned}}else {
                        LogUtil.CLIENT_LOG
                                .info(| | | "{} {} {} {} | | {} {}", (System.currentTimeMillis() - createTime), "timeout",
                                        RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                        "polling", clientMd5Map.size(), probeRequestSize);
                        sendResponse(null); }}catch (Throwable t) {
                    LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                }

            }

        }, timeoutTime, TimeUnit.MILLISECONDS);

        allSubs.add(this);  // Add the current thread to the subscription event queue}}Copy the code

allSubs

AllSubs is a queue containing the object ClientLongPolling. This queue seems to be associated with configuration changes.

So what must be done here is that when the user changes the configuration in the NacOS console, he must take the client long connection of interest out of the subscription and return the result of the change. So we look at the constructor of the LongPollingService to find the subscription relationship

/** * long polling subscription */
final Queue<ClientLongPolling> allSubs;

allSubs.add(this);
Copy the code

LongPollingService

In the constructor of LongPollingService, a NotifyCenter is used to subscribe to an event. It is easy to see that if the instance of the event is LocalDataChangeEvent, that is, the time when the server data changes, A DataChangeTask thread is executed.

public LongPollingService(a) {
    allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();

    ConfigExecutor.scheduleLongPolling(new StatTask(), 0L.10L, TimeUnit.SECONDS);

    // Register LocalDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);

    // Register LocalDataChangeEvent to subscribe to events
    NotifyCenter.registerSubscriber(new Subscriber() {

        @Override
        public void onEvent(Event event) {
            if (isFixedPolling()) {
                // Ignore.
            } else {
                if (event instanceof LocalDataChangeEvent) { // If LocalDataChangeEvent is triggered, execute the following code
                    LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                    ConfigExecutor.executeLongPolling(newDataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); }}}@Override
        public Class<? extends Event> subscribeType() {
            returnLocalDataChangeEvent.class; }}); }Copy the code

DataChangeTask

Data change event thread, code as follows

class DataChangeTask implements Runnable {

    @Override
    public void run(a) {
        try {
            ConfigCacheService.getContentBetaMd5(groupKey); //
            // Iterate over all subscribed event tables
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next(); / / get ClientLongPolling
                // Check whether the requested key in the current ClientLongPolling contains the currently modified groupKey
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // If published tag is not in the beta list, then it skipped.
                    if(isBeta && ! CollectionUtils.contains(betaIps, clientSub.ip)) {// If it is in beta mode and betaIps does not contain the current client IP address, return directly
                        continue;
                    }

                    // If published tag is not in the tag list, then it skipped.
                    if(StringUtils.isNotBlank(tag) && ! tag.equals(clientSub.tag)) {// If the tag is configured and does not contain the current client's tag, this field is returned
                        continue;
                    }
					//
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // Delete subscribers' relationships. Remove a subscription from the current client
                    LogUtil.CLIENT_LOG
                            .info(| | | "{} {} {} {} | | {} {} | {}", (System.currentTimeMillis() - changeTime), "in-advance",
                                    RequestUtil
                                            .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                    "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                    clientSub.sendResponse(Arrays.asList(groupKey)); // Respond to a client request.}}}catch (Throwable t) {
            LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); }}}Copy the code

The principle of summary

Copyright Notice: All articles on this blog are subject to a CC BY-NC-SA 4.0 license unless otherwise stated. Reprint please specify from Mic to take you to learn structure! If this article is helpful to you, please also help to point attention and like, your persistence is the power of my continuous creation. Welcome to pay attention to the same wechat public account for more technical dry goods!