preface
Version 1.4 nacOS uses the Http short connection + long polling mode. The client initiates an Http request, and the server holds the request and responds to the client when the configuration is changed. The timeout time is 30s.
port | Port offset | type | implementation | describe |
---|---|---|---|---|
8848 | 0 | HTTP | SpringBoot | Console access, cluster communication, client communication. |
7848 | – 1000. | gRPC | JRaftServer | JRaft service. |
Version 2.0 nacOS replaces HTTP short connection long polling with gRPC long connection. Configuration synchronization is implemented in a push-pull mode.
-
Pull: The client registers the listener with the server once in a while and updates the local configuration of the client if the configuration changes.
-
Push: When a configuration is changed, the server notifies the client listening on that configuration, and the client registers the listener again and updates the configuration.
port | Port offset | type | implementation | describe |
---|---|---|---|---|
8848 | 0 | HTTP | SpringBoot | Console access. |
7848 | – 1000. | gRPC | JRaftServer | JRaft service. |
9848 | 1000 | gRPC | GrpcSdkServer | Process Nacos client requests. |
9849 | 1001 | gRPC | GrpcClusterServer | Nacos Server cluster communication. |
1.4 Long Polling
Recalling version 1.4, when the ClientWorker was constructed, two threading services were started
- A threaded service for detecting and submitting LongPollingRunnable long polling tasks (2.0 deprecated)
When the ClientWorker is constructed, a scheduled task is started and the checkConfigInfo method is executed every 10ms. CheckConfigInfo Checks the current number of CacheData and whether to enable a long polling task. If the current number of long polling tasks is < math. ceil(cacheMap size / 3000), a new long polling task is started. In the case of few configuration files, at most one long polling task.
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
// ...
// Inspect and submit LongPollingRunnable to this. ExecutorService
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run(a) {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
}
Copy the code
- The LongPollingRunnable long polling task initiates a 30-second long polling task to the server. When the server detects configuration changes, it pushes them to the client. (2.0 deprecated)
2.0 Client listening configuration
After 2.0, configuration listening no longer uses HTTP short connection long polling mode, but uses long connection.
When ClientWorker is constructed, a ConfigRpcTransportClient is created, a thread service is injected, and its start method is executed.
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final Properties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
init(properties);
agent = new ConfigRpcTransportClient(properties, serverListManager);
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
returnt; }}); agent.setExecutor(executorService); agent.start(); }Copy the code
The start method of the ConfigRpcTransportClient runs a configuration synchronization task in an infinite loop using an external thread service.
// **ConfigRpcTransportClient**
@Override
public void startInternal(a) throws NacosException {
executor.schedule(new Runnable() {
@Override
public void run(a) {
while (true) {
try {
// Wait for wake up, or wait for 5s
listenExecutebell.poll(5L, TimeUnit.SECONDS);
// Configure listening
executeConfigListen();
} catch (Exception e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e); }}}},0L, TimeUnit.MILLISECONDS);
}
Copy the code
Let’s take a look at the ConfigRpcTransportClient properties.
public class ConfigRpcTransportClient extends ConfigTransportClient {
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<Object>(1);
private Object bellItem = new Object();
private long lastAllSyncTime = System.currentTimeMillis();
}
Copy the code
- BlockingQueue listenExecutebell: a BlockingQueue of capacity 1 that wakes up the configuration synchronization task blocking while the element is produced.
- Object bellItem: A plain Object used to put into a blocking queue.
- Long lastAllSyncTime: Timestamp of the last full synchronization.
1. CacheMap groups
Let’s look at the processing of this configuration synchronization task. The executeConfigListen method is executed when the blocking queue is put into an element or 5s times out.
// ClientWorker.ConfigRpcTransportClient
/**
* groupKey -> cacheData.
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());
@Override
public void executeConfigListen(a) {
// taskId -cachedata (listener)
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
// taskId -cachedata (no listener)
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
long now = System.currentTimeMillis();
// Current time - If the time since the last full synchronization is greater than = 5 minutes, full synchronization needs to be performed
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
if (cache.isSyncWithServer()) {
// 1. For configurations that have been synchronized, verify md5 consistency between cacheData and listener again
cache.checkListenerMd5();
// If the time since the last full synchronization is less than 5 minutes, no configuration is synchronized
if(! needAllSync) {continue; }}// The listener is configured
if(! CollectionUtils.isEmpty(cache.getListeners())) {if(! cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {
cacheDatas = newLinkedList<CacheData>(); listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); }}// The listener is not configured
else if (CollectionUtils.isEmpty(cache.getListeners())) {
if(! cache.isUseLocalConfigInfo()) { List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {
cacheDatas = newLinkedList<CacheData>(); removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); }}}}boolean hasChangedKeys = false;
// 2. Process cacheData with a Listener and initiate a Listen request
if(! listenCachesMap.isEmpty()) {// ...
}
// 3. Remove the listener
if(! removeListenCachesMap.isEmpty()) {// ...
}
if (needAllSync) {
lastAllSyncTime = now;
}
// 4. If the configuration is changed, executeConfigListen is triggered again immediately
if(hasChangedKeys) { notifyListenConfig(); }}Copy the code
CacheData in cacheMap is divided into two maps, one with a Listener and one without a Listener, depending on whether a Listener exists. For the former need to register the listener, for the latter need to remove the listener. (cacheMap is by the configService. AddListener user registration, see chapter one)
// ClientWorker.ConfigRpcTransportClient#executeConfigListen
// taskId -cachedata (listener)
Map<String, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
// taskId -cachedata (no listener)
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
Copy the code
In addition, the amount of synchronization (needAllSync = false), 5 minutes CacheData. IsSyncWithServer = true, this part CacheData is not involved in the registered monitoring logic, not counted in the two maps, In this case, only the consistency between cacheData. md5 and listener. md5 is checked and the Listener is triggered.
// Current time - If the time since the last full synchronization is greater than = 5 minutes, full synchronization needs to be performed
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
if (cache.isSyncWithServer()) {
// 1. For configurations that have been synchronized, verify md5 consistency between cacheData and listener again
cache.checkListenerMd5();
// If the time since the last full synchronization is less than 5 minutes, no configuration is synchronized
if(! needAllSync) {continue; }}// ...
}
// ...
Copy the code
IsSyncWithServer indicates server.md5== client.cacheData. md5==client.listeners. Md5 indicates that the configuration of CacheData has been synchronized. That is, after a full synchronization of five minutes, all CacheData listeners are re-registered.
2. Register to listen to configuration synchronization
For listenCachesMap, call the gRPC interface to register the listener.
// ClientWorker.ConfigRpcTransportClient#executeConfigListen
// 2. Process cacheData with a Listener and initiate a Listen request
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> listenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
configChangeListenRequest.setListen(true);
try {
// Each taskId corresponds to an RpcClient #1
RpcClient rpcClient = ensureRpcClient(taskId);
// gRPC registers listener #2
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
// gRPC returns result processing #3
if(configChangeBatchListenResponse ! =null && configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<String>();
if(! CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) { hasChangedKeys =true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
.getChangedConfigs()) {
String changeKey = GroupKey
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
// Query the latest configuration, drop snapshot, and refresh the configuration in CacheData
// For uninitialized configurations, notify all listeners
refreshContentAndCheck(changeKey, !isInitializing);
}
}
// Set syncWithServer=true for no configuration changes to reduce the overhead of synchronous configuration
for (CacheData cacheData : listenCaches) {
String groupKey = GroupKey
.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
if(! changeKeys.contains(groupKey)) {synchronized (cacheData) {
if(! cacheData.getListeners().isEmpty()) { cacheData.setSyncWithServer(true);
continue; }}}// Sets all cacheData to uninitialized
cacheData.setInitializing(false); }}}catch (Exception e) {
//ignore}}Copy the code
One ensureRpcClient method that ensures each taskId has one RpcClient, while 1.4 is one LongPollingRunnable task per taskId.
// ClientWorker.ConfigRpcTransportClient#ensureRpcClient
// Each taskId corresponds to an RpcClient #1
private synchronized RpcClient ensureRpcClient(String taskId) throws NacosException {
Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<String, String>(labels);
newLabels.put("taskId", taskId);
// One taskId corresponds to one RpcClient
// Each RpcClient has its own thread pool
RpcClient rpcClient = RpcClientFactory
.createClient("config-" + taskId + "-" + uuid, getConnectionType(), newLabels);
if (rpcClient.isWaitInitiated()) {
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.clientAbilities(initAbilities());
rpcClient.start();
}
return rpcClient;
}
// RpcClientFactory#createClient
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map<String, String> labels) {
String clientNameInner = clientName;
synchronized (clientMap) {
if (clientMap.get(clientNameInner) == null) {
RpcClient moduleClient = null;
if (ConnectionType.GRPC.equals(connectionType)) {
moduleClient = new GrpcSdkClient(clientNameInner);
}
moduleClient.labels(labels);
clientMap.put(clientNameInner, moduleClient);
return moduleClient;
}
returnclientMap.get(clientNameInner); }}Copy the code
The Nacos Server port 9848 is then requested to register the listener. Instead of the 1.4Server holding the request, it will return immediately.
// gRPC registers listener #2
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
Copy the code
Server-side ConfigChangeBatchListenResponse returns md5 have changed configuration items, refreshContentAndCheck method will query the server configuration, latest and update the snapshot files and CacheData. Here getServerConfig is the same as 1.4, but instead of gRPC; CheckListenerMd5, like 1.4, compares MD5 in CacheData to MD5 in the Listener and triggers a Listener if it changes.
// gRPC returns result processing #3
private void refreshContentAndCheck(String groupKey, boolean notify) {
if(cacheMap.get() ! =null&& cacheMap.get().containsKey(groupKey)) { CacheData cache = cacheMap.get().get(groupKey); refreshContentAndCheck(cache, notify); }}// Refresh the configuration in CacheData and notify the listener
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
try {
String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
cacheData.setContent(ct[0]);
if (null! = ct[1]) {
cacheData.setType(ct[1]);
}
cacheData.checkListenerMd5();
} catch(Exception e) { LOGGER.error(); }}Copy the code
Since the blocking queue will time out after 5 seconds, the executeConfigListen method will be triggered again. To reduce the overhead of frequently configuring full synchronization, CacheData that does not make a difference between server and client configurations is marked isSyncWithServer=true. Indicates that the configurations of the client and server are consistent. You do not need to perform full synchronization within 5 minutes.
3. Remove listening
For CacheData with no listeners, the same gRPC interface is invoked to deregister the listener, except that the listen parameter in ConfigBatchListenRequest is false to deregister the listener.
// ClientWorker.ConfigRpcTransportClient#executeConfigListen
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> removeListenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
configChangeListenRequest.setListen(false);
try {
RpcClient rpcClient = ensureRpcClient(taskId);
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e); }}Copy the code
4. Wake up the listening task
You can wake up the configuration synchronization task by putting elements into the blocking queue listenExecutebell.
// ClientWorker.ConfigRpcTransportClient#notifyListenConfig
@Override
public void notifyListenConfig(a) {
listenExecutebell.offer(bellItem);
}
Copy the code
The configuration synchronization task is woken up when there are elements in the listenExecutebell blocking queue. So when does the configuration synchronization task wake up?
Scenario 1: configuration synchronization task execution, the server to monitor response ConfigChangeBatchListenResponse contained in the change of configuration.
// ClientWorker.ConfigRpcTransportClient#executeConfigListen
// 4. If the configuration is changed, executeConfigListen is triggered again immediately
public void executeConfigListen(a) {
// ...
if(hasChangedKeys) { notifyListenConfig(); }}Copy the code
Scenario 2: When adding a Listener, you need to synchronize the configuration immediately. Note that the entire CacheData is also marked with syncWithServer=false to enforce configuration synchronization to ensure data consistency on both ends.
// ClientWorker
public void addListeners(String dataId, String group, List<? extends Listener> listeners) {
group = null2defaultGroup(group);
CacheData cache = addCacheDataIfAbsent(dataId, group);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setSyncWithServer(false); agent.notifyListenConfig(); }}Copy the code
Scenario 3: the server sends ConfigChangeNotifyRequest request, said a configuration change, need to perform client configuration synchronization.
// ClientWorker.ConfigRpcTransportClient#initRpcClientHandler
private void initRpcClientHandler(final RpcClient rpcClientInner) {
rpcClientInner.registerServerRequestHandler((request) -> {
if (request instanceof ConfigChangeNotifyRequest) {
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
String groupKey = GroupKey
.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
configChangeNotifyRequest.getTenant());
CacheData cacheData = cacheMap.get().get(groupKey);
if(cacheData ! =null) {
cacheData.setSyncWithServer(false);
notifyListenConfig();
}
return new ConfigChangeNotifyResponse();
}
return null;
});
}
Copy the code
Other scenarios are listed, but the configuration synchronization task is awakened when the client needs to be aware of configuration changes.
The 2.0 server handles listening requests
The service side ConfigChangeBatchListenRequestHandler handle ConfigBatchListenRequest.
@Component
public class ConfigChangeBatchListenRequestHandler extends RequestHandler<ConfigBatchListenRequest.ConfigChangeBatchListenResponse> {
@Autowired
private ConfigChangeListenContext configChangeListenContext;
@Override
@TpsControl(pointName = "ConfigListen")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public ConfigChangeBatchListenResponse handle(ConfigBatchListenRequest configChangeListenRequest, RequestMeta meta) throws NacosException {
String connectionId = StringPool.get(meta.getConnectionId());
String tag = configChangeListenRequest.getHeader(Constants.VIPSERVER_TAG);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = new ConfigChangeBatchListenResponse();
for (ConfigBatchListenRequest.ConfigListenContext listenContext : configChangeListenRequest.getConfigListenContexts()) {
String groupKey = GroupKey2.getKey(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
groupKey = StringPool.get(groupKey);
String md5 = StringPool.get(listenContext.getMd5());
if (configChangeListenRequest.isListen()) {
// Save the connectionId -> key and key -> MD5 relationships on the server
configChangeListenContext.addListen(groupKey, md5, connectionId);
// Verify whether the configuration has been changed. If so, add the changed groupKey to the response packet
boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
if (!isUptoDate) {
configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant());
}
} else{ configChangeListenContext.removeListen(groupKey, connectionId); }}returnconfigChangeBatchListenResponse; }}Copy the code
The request for registration to monitor, long connection (connectionId) < – > groupKey < – > client md5 mapping relationship, and groupKey < – > save the mapping relationship to ConfigChangeListenContext connectionId collection.
The former mapping is for console display only;
The latter mapping is mainly used to find subscribing clients to be notified of configuration items that change later.
@Component
public class ConfigChangeListenContext {
/** * groupKey-> connection set. */
private ConcurrentHashMap<String, HashSet<String>> groupKeyContext = new ConcurrentHashMap<String, HashSet<String>>();
/** * connectionId-> group key set. */
private ConcurrentHashMap<String, HashMap<String, String>> connectionIdContext = new ConcurrentHashMap<String, HashMap<String, String>>();
public synchronized void addListen(String groupKey, String md5, String connectionId) {
// 1.add groupKeyContext
Set<String> listenClients = groupKeyContext.get(groupKey);
if (listenClients == null) {
groupKeyContext.putIfAbsent(groupKey, new HashSet<String>());
listenClients = groupKeyContext.get(groupKey);
}
listenClients.add(connectionId);
// 2.add connectionIdContext
HashMap<String, String> groupKeys = connectionIdContext.get(connectionId);
if (groupKeys == null) {
connectionIdContext.putIfAbsent(connectionId, new HashMap<String, String>(16)); groupKeys = connectionIdContext.get(connectionId); } groupKeys.put(groupKey, md5); }}Copy the code
In addition, similar to 1.4 Long Polling logic, if the MD5 of the client is inconsistent with the MD5 of the server, the server returns the inconsistent groupKey.
// ConfigChangeBatchListenRequestHandler.handle
// Verify whether the configuration has been changed. If so, add the changed groupKey to the response packet
boolean isUptoDate = ConfigCacheService.isUptodate(groupKey, md5, meta.getClientIp(), tag);
if(! isUptoDate) { configChangeBatchListenResponse.addChangeConfig(listenContext.getDataId(), listenContext.getGroup(), listenContext.getTenant()); }Copy the code
Monitoring, to remove the service side is only removed ConfigChangeListenContext above the mapping relationship.
4. 2.0 server configuration publishing
After the local and memory configurations of the Nacos server are updated, the LocalDataChangeEvent event is published.
In version 1.4, the server handles LocalDataChangeEvent through LongPollingService.
The 2.0 Http long polling logic is still in place and has not been removed, but the LocalDataChangeEvent event handler on the 2.0 client is RpcConfigChangeNotifier.
@Component(value = "rpcConfigChangeNotifier")
public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
public RpcConfigChangeNotifier(a) {
NotifyCenter.registerSubscriber(this);
}
@Override
public void onEvent(LocalDataChangeEvent event) {
String groupKey = event.groupKey;
boolean isBeta = event.isBeta;
List<String> betaIps = event.betaIps;
String[] strings = GroupKey.parseKey(groupKey);
String dataId = strings[0];
String group = strings[1];
String tenant = strings.length > 2 ? strings[2] : ""; String tag = event.tag; configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag); }}Copy the code
ConfigDataChanged obtained from the monitoring context ConfigChangeListenContext groupKey corresponding all listening connectionId, Then use the ConnectionManager to obtain the Connection gRPC long Connection corresponding to the connectionId. Finally build ConfigChangeNotifyRequest synchronization configuration request parameters, submit RpcPushTask to other threads processing service, don’t block the other event processing.
// RpcConfigChangeNotifier
public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta,
List<String> betaIps, String tag) {
// From the context of the registered listener, get all listener connectionids corresponding to the groupKey
Set<String> listeners = configChangeListenContext.getListeners(groupKey);
if(! CollectionUtils.isEmpty(listeners)) {for (final String client : listeners) {
// Obtain the actual LONG gRPC connection using connectionId
Connection connection = connectionManager.getConnection(client);
if (connection == null) {
continue;
}
// ...
// Build synchronous configuration request parameters
ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
// To avoid blocking other event processing, submit a task to another thread pool for processing
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, connection.getMetaInfo().getAppName()); push(rpcPushRetryTask); }}}Copy the code
The push method is divided into three branches.
If the Task has been retried more than 50 times, ConnectionManager closes the long connection.
If the connectionId in ConnectionManager still exists and the Task is submitted normally, the delay compensation will be performed after each Task execution failure. The delay time is equal to the number of failures x 2 seconds.
If the connection does not exist, no action is taken.
// RpcConfigChangeNotifier
private void push(RpcPushTask retryTask) {
ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest;
if (retryTask.isOverTimes()) { // Retry more than 50 times
connectionManager.unregister(retryTask.connectionId);
return;
} else if(connectionManager.getConnection(retryTask.connectionId) ! =null) {
ConfigExecutor.getClientConfigNotifierServiceExecutor()
.schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS);
} else {
// client is already offline,ingnore task.}}Copy the code
RpcPushTask is an internal class of RpcConfigChangeNotifier, where the run method handles fault tolerant logic.
- If the traffic is limited, resubmit the Task
- If the RPC fails, resubmit the Task
class RpcPushTask implements Runnable {
@Override
public void run(a) {
tryTimes++;
// If the traffic is limited, resubmit the Task
if(! tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) { push(this);
} else {
/ / gRPC request the client sends ConfigChangeNotifyRequest
rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) {
@Override
public void onSuccess(a) {
tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp);
}
@Override
public void onFail(Throwable e) {
tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp);
// If it fails, resubmit the Task
push(RpcPushTask.this); } }, ConfigExecutor.getClientConfigNotifierServiceExecutor()); }}}Copy the code
For the client logic, see Scenario 3 in 2.4 Waking up a Listening Task.
conclusion
2. The main change in the X configuration center is to introduce long connections instead of long polling for short connections.
Client changes:
Change Point 1:
1. Every 3000 CacheData, the client starts a LongPollingRunnable polling task. 2. X Every 3000 CacheData, the client enables one RpcClient. Each RpcClient establishes a long connection with the server.
Change 2:
Added the logic of scheduled full pull configuration on the client.
In 1.x, the Nacos configuration center updates the client configuration through a long polling mode, with only configuration push for the client.
X supports periodic client synchronization. Therefore, 2. X combines push and pull.
Pull: Every five minutes, the client sends a ConfigBatchListenRequest (ConfigBatchListenRequest) request to full CacheData. If the MD5 configuration is changed, the client receives the changed configuration item and sends a ConfigQuery request to query the real-time configuration.
Push: server configuration changes, will send ConfigChangeNotifyRequest request giving long connection to the current node client notice item configuration changes.
Server side changes:
Change Point 1:
Because 2.x uses long connections instead of long polling, the ConfigBatchListenRequest will not be held by the server and will be returned immediately. The server simply keeps the listening relationship in memory for subsequent notification.
The mapping between groupKey and connectionId helps you find the client long connection by changing configuration items. The mapping between connectionId and groupKey is for console display only. These relationships are stored in the server ConfigChangeListenContext single case.
Change 2:
For change point 1, 1.x needs to find the client AsyncContext that is still conducting long polling through the groupKey. 2. X is found through groupKey connectionId, then find long connection, through connectionId ConfigChangeNotifyRequest notice sending the client configuration changes.