In the previous section, we have had an overview of what a distributed configuration center is and how it works and how it is implemented. Today we will explore the implementation of nacOS in configuration center based on the configuration center principle described in the previous section.
preface
The source version mentioned in the article and the Nacos official documentation:
- Spring – the cloud – alibaba – nacos: 2.2.5 RELEASE
- Nacos – server: 2.0.2
- nacos-example: master
Review of configuration center principles
In the previous section, we walked through the implementation of the configuration center. There are several points:
- When the client starts, it needs to obtain the configuration from the server and cache it locally.
- The server needs to persist the configuration and provide a visual page for adding, deleting, modifying, and querying the configuration.
- A listening mechanism that detects configuration changes on the server is updated in real time.
There are generally two modes of listening mechanisms mentioned here. In the Pull mode, the client proactively pulls configuration from the server. This mode is usually implemented by requesting the server for a scheduled task. One is push mode, in which the server actively notifies the client when configuration changes occur. Both methods have certain advantages and disadvantages:
- The pull mode:
- Advantage: need to be pulled. Control the configuration of the pull. The server does not care about the client connection.
- Disadvantages: Generally have real-time requirements, will run a scheduled task every second, there is a certain consumption of resources.
- Push mode:
- Optimization: Notifies the client of configuration changes without active monitoring, reducing resource occupation.
- Disadvantages: The server needs to save the connection information and configuration of the client.
Basic use and principle of Nacos
Nacos introduction
For the introduction of Nacos, I recommend you to check out the Nacos website for details
Nacos use
The premise condition
To download Nacos and start Nacos Server, see Nacos Quick Start
- Add dependencies to your Spring Cloud application.
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.2.5. RELEASE</version>
</dependency>
Copy the code
Note: Version 2.1.X. lease corresponds to Spring Boot version 2.1.x. Release corresponds to Spring Boot 2.2. x, and 1.5.x. release corresponds to Spring Boot 1.5.x.
For more version mappings: Version Description Wiki 2. Bootstrap. Properties configures the address and application name of the Nacos Server
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.application.name=example
Copy the code
Note: You need to configure Spring.application.name because it forms part of the Nacos configuration management dataId field.
In Nacos Spring Cloud, the full format of dataId is as follows:
${prefix}-${spring.profiles.active}.${file-extension}
Copy the code
- Prefix for spring. The default application. The name of the value, but can be by spring configuration items. The cloud. Nacos. Config. The prefix to configuration.
- Spring.profiles. active is the profile corresponding to the current environment. For details, refer to the Spring Boot documentation. Note: When spring.profiles. Active is null, the corresponding connector – will also not exist and the dataId concatenation format becomes prefix.{prefix}.prefix.{file-extension}
- File – exetension as the configuration of content data format, can be configured a spring. Cloud. Nacos. Config. The file – the extension to the configuration. Currently, only properties and YAML types are supported.
- Automatic configuration updates via the Spring Cloud native annotation @refreshScope:
@RestController
@RequestMapping("/config")
@RefreshScope
public class ConfigController {
@Value("${useLocalCache:false}")
private boolean useLocalCache;
@RequestMapping("/get")
public boolean get(a) {
returnuseLocalCache; }}Copy the code
This makes it easy to use Nacos as the configuration center for Spring Cloud projects.
Note: Refer to the official Nacos documentation for this section
Nacos principle source code analysis
1. Load the server configuration and cache it locally
In Spring, there are two important class externalized configuration, org. Springframework. Core. The env. Environment and org. Springframework. Core. The env. PropertySource. All externalized configurations are used by loading the PropertySource into the Environment. The same is true for nacOS. Add the NACOS PropertySource to the Environment to load the externalized configuration.
PropertySourceBootstrapConfiguration
Spring in the Cloud a org. Springframework. Cloud. The bootstrap. Config. PropertySourceBootstrapConfiguration configuration, To complete the bootstrap level configuration load.
Org. Springframework. Cloud. The bootstrap. Config. PropertySourceBootstrapConfiguration# initialize implements the method ApplicationContextInitializer interface, can be carried in the spring container initialization is completed.
// Automatically inject all PropertySourceLocator
@Autowired(required = false)
private List<PropertySourceLocator> propertySourceLocators = new ArrayList<>();
@Override
public void initialize(ConfigurableApplicationContext applicationContext) {
CompositePropertySource composite = new CompositePropertySource(
BOOTSTRAP_PROPERTY_SOURCE_NAME);
// Sort the propertySourceLocators array
AnnotationAwareOrderComparator.sort(this.propertySourceLocators);
boolean empty = true;
// Get the environment in the current context
ConfigurableEnvironment environment = applicationContext.getEnvironment();
for (PropertySourceLocator locator : this.propertySourceLocators) { PropertySource<? > source =null;
// Call back all locate methods
source = locator.locate(environment);
if (source == null) {
continue;
}
logger.info("Located property source: " + source);
// Add a non-empty PropertySource to the composite
composite.addPropertySource(source);
empty = false;
}
if(! empty) {// Get PropertySources in the environment
MutablePropertySources propertySources = environment.getPropertySources();
String logConfig = environment.resolvePlaceholders("${logging.config:}");
LogFile logFile = LogFile.get(environment);
if (propertySources.contains(BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
propertySources.remove(BOOTSTRAP_PROPERTY_SOURCE_NAME);
}
// Add a non-empty composite to the environmentinsertPropertySources(propertySources, composite); reinitializeLoggingSystem(environment, logConfig, logFile); setLogLevels(applicationContext, environment); handleIncludedProfiles(environment); }}Copy the code
As you can see, our PropertySource is finally returned by locator. Locate (Environment), and who implements locator. Locate (Environment). Nacos must implement and return the PropertySource here.
NacosPropertySourceLocator
Using the locator. Locate (Environment) method, We track to org. Springframework. Cloud. Alibaba. Nacos. Client. NacosPropertySourceLocator# locate.
public class NacosPropertySourceLocator implements PropertySourceLocator {... Omit some code...@Override
publicPropertySource<? > locate(Environment env) {// Get the server connection information
ConfigService configService = nacosConfigProperties.configServiceInstance();
if (null == configService) {
LOGGER.warn(
"no instance of config service found, can't load config from nacos");
return null;
}
long timeout = nacosConfigProperties.getTimeout();
/ / initialize NacosPropertySourceBuilder
nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
timeout);
String name = nacosConfigProperties.getName();
// Get some configuration of nacosConfig.
String nacosGroup = nacosConfigProperties.getGroup();
String dataIdPrefix = nacosConfigProperties.getPrefix();
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = name;
}
if (StringUtils.isEmpty(dataIdPrefix)) {
dataIdPrefix = env.getProperty("spring.application.name");
}
List<String> profiles = Arrays.asList(env.getActiveProfiles());
nacosConfigProperties.setActiveProfiles(profiles.toArray(new String[0]));
String fileExtension = nacosConfigProperties.getFileExtension();
CompositePropertySource composite = new CompositePropertySource(
NACOS_PROPERTY_SOURCE_NAME);
// Load the share configuration
loadSharedConfiguration(composite);
// Load the extension configuration
loadExtConfiguration(composite);
// Load the application configuration
loadApplicationConfiguration(composite, nacosGroup, dataIdPrefix, fileExtension);
returncomposite; }}Copy the code
To achieve a NacosPropertySourceLocator PropertySourceLocator, And in the org. Springframework. Cloud. The bootstrap. Config. All will inject PropertySourceLocator PropertySourceBootstrapConfiguration And make a loop call. From this we know that if we want to implement a configuration center by ourselves, we need to implement a corresponding item PropertySourceLocator
Here NacosPropertySourceLocator total load the three configurations, respectively is a Shared configuration, extend the configuration and the configuration of the application itself.
loadApplicationConfiguration
Regardless of the shared configuration and the extension configuration, whatever configuration is loaded, it will eventually be retrieved by accessing the remote server, but with different parameters passed in.
private void loadApplicationConfiguration( CompositePropertySource compositePropertySource, String nacosGroup, String dataIdPrefix, String fileExtension) {
// Load the application configuration. The dataIdPrefix, if not specified, is spring.application.name.
Example. Perproties = // The example. Perproties file is set to the server
loadNacosDataIfPresent(compositePropertySource,
dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
// Load the profile configuration. If there are profiles, they will be loaded in sequence, such as example-dev. Peroproties and example-prod.peroproties
for (String profile : nacosConfigProperties.getActiveProfiles()) {
String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
fileExtension, true); }}Copy the code
Here you see that the method to load the configuration is still loadNacosDataIfPresent, so let’s move on
loadNacosDataIfPresent
private void loadNacosDataIfPresent(final CompositePropertySource composite,
final String dataId, final String group, String fileExtension,
boolean isRefreshable) {
// The simplest bootstrap.properties we use here has no refresh related configuration configured. Directly see the else
if(NacosContextRefresher.loadCount.get() ! =0) {
NacosPropertySource ps;
if(! isRefreshable) { ps = NacosPropertySourceRepository.getNacosPropertySource(dataId); }else {
ps = nacosPropertySourceBuilder.build(dataId, group, fileExtension, true);
}
composite.addFirstPropertySource(ps);
}
else {
// Get NacosPropertySource and load it into the first of the composite.NacosPropertySource ps = nacosPropertySourceBuilder.build(dataId, group, fileExtension, isRefreshable); composite.addFirstPropertySource(ps); }}Copy the code
nacosPropertySourceBuilder.build
NacosPropertySource build(String dataId, String group, String fileExtension,
boolean isRefreshable) {
// Get the configuration data from the remote server
Properties p = loadNacosData(dataId, group, fileExtension);
if (p == null) {
p = EMPTY_PROPERTIES;
}
// Assemble NacosPropertySource from existing data and return.
NacosPropertySource nacosPropertySource = new NacosPropertySource(group, dataId,
propertiesToMap(p), new Date(), isRefreshable);
NacosPropertySourceRepository.collectNacosPropertySources(nacosPropertySource);
return nacosPropertySource;
}
Copy the code
Here we finally see how to get server-side data.
loadNacosData
private Properties loadNacosData(String dataId, String group, String fileExtension) {
String data = null;
try {
// Get remote data from the server
data = configService.getConfig(dataId, group, timeout);
if(! StringUtils.isEmpty(data)) { LOGGER.info(String.format("Loading nacos data, dataId: '%s', group: '%s'",
dataId, group));
// Parse data differently according to the extension
if (fileExtension.equalsIgnoreCase("properties")) {
Properties properties = new Properties();
properties.load(new StringReader(data));
return properties;
}
else if (fileExtension.equalsIgnoreCase("yaml")
|| fileExtension.equalsIgnoreCase("yml")) {
YamlPropertiesFactoryBean yamlFactory = new YamlPropertiesFactoryBean();
yamlFactory.setResources(new ByteArrayResource(data.getBytes()));
returnyamlFactory.getObject(); }}}catch (NacosException e) {
LOGGER.error("get data from Nacos error,dataId:{}, ", dataId, e);
}
catch (Exception e) {
LOGGER.error("parse data from Nacos error,dataId:{},data:{},", dataId, data,
e);
}
return null;
}
Copy the code
getConfig & getConfigInner
@Override
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
Copy the code
Here in com. Alibaba. Nacos. Client. Config. NacosConfigService# getConfig method of direct call getConfigInner we also see getConfigInner directly.
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = null2defaultGroup(group);
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// Local configuration is preferred
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
if(content ! =null) {
log.warn(agent.getName(), "[get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", dataId,
group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
// If the local configuration is null, the configuration information is obtained from the server
content = worker.getServerConfig(dataId, group, tenant, timeoutMs);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
log.warn("NACOS-0003",
LoggerHelper.getErrorCodeStr("NACOS"."NACOS-0003"."Environmental issues"."get from server error"));
log.warn(agent.getName(), "[get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
dataId, group, tenant, ioe.toString());
}
log.warn(agent.getName(), "[get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", dataId,
group, tenant, ContentUtils.truncateContent(content));
// If an exception occurs, load the configuration using the local snapshot file
content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
Copy the code
worker.getServerConfig
com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig
public String getServerConfig(String dataId, String group, String tenant, long readTimeout)
throws NacosException {
if (StringUtils.isBlank(group)) {
group = Constants.DEFAULT_GROUP;
}
HttpResult result = null;
try {
List<String> params = null;
if (StringUtils.isBlank(tenant)) {
params = Arrays.asList("dataId", dataId, "group", group);
} else {
params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant);
}
// Request the server to obtain the configuration through HTTP
result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
} catch (IOException e) {
log.error(agent.getName(), "NACOS-XXXX"."[sub-server] get server config exception, dataId={}, group={}, tenant={}, msg={}", dataId, group,
tenant, e.toString());
throw new NacosException(NacosException.SERVER_ERROR, e.getMessage());
}
switch (result.code) {
// check return code. If there is a normal response, the content is saved as snapshot and the configuration content is returned
case HttpURLConnection.HTTP_OK:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content);
return result.content;
// check return code. If there is a 404 response, set snapshot to empty and return empty content
case HttpURLConnection.HTTP_NOT_FOUND:
LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
return null;
// Other exceptions.
case HttpURLConnection.HTTP_CONFLICT: {
log.error(agent.getName(), "NACOS-XXXX"."[sub-server-error] get server config being modified concurrently, dataId={}, group={}, tenant={}",
dataId, group, tenant);
throw new NacosException(NacosException.CONFLICT,
"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
}
case HttpURLConnection.HTTP_FORBIDDEN: {
log.error(agent.getName(), "NACOS-XXXX"."[sub-server-error] no right, dataId={}, group={}, tenant={}",
dataId, group, tenant);
throw new NacosException(result.code, result.content);
}
default: {
log.error(agent.getName(), "NACOS-XXXX"."[sub-server-error] dataId={}, group={}, tenant={}, code={}",
dataId, group, tenant, result.code);
throw new NacosException(result.code,
"http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant="+ tenant); }}}Copy the code
As you can see, getServerConfig requests /v1/cs/configs from the server via GET. This method is a server-side method that returns the configuration requested by the client. Instead of looking at the implementation here, the client gets the configuration content from the server. Here, in turn, return back to the first place, namely org. Springframework. Cloud. The bootstrap. Config. PropertySourceBootstrapConfiguration# initialize method, Now that the configuration returned by nacosServer has been loaded into the Environment, our Spring Cloud application can get the configuration added to nacOS.
2. The server persists the configuration and performs operations on the configuration
On the second point, nacOS generally uses mysql database to do configuration persistence, in fact, this is to add, delete, change and check mysql data, not detailed here, interested in the official source code. Com. Alibaba. Nacos. Config. Server controller. ConfigController# getConfig can look up for entrance to this method.
3. Dynamic awareness of the client configuration
The client
NacosConfigService
In com. Alibaba. Nacos. Client. Config. NacosConfigService# NacosConfigService constructor, as NacosConfigService is instantiated, do something:
- The httpAgent is initialized and the actual working class is
ServerHttpAgent
In fact, this is mainly used to connect to the server. - The ClientWorker is initialized, and the httpAgent is passed in as a parameter.
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
String namespaceTmp = properties.getProperty(PropertyKeyConst.NAMESPACE);
if (StringUtils.isBlank(namespaceTmp)) {
namespace = TenantUtil.getUserTenant();
properties.put(PropertyKeyConst.NAMESPACE, namespace);
} else {
namespace = namespaceTmp;
properties.put(PropertyKeyConst.NAMESPACE, namespace);
}
agent = new ServerHttpAgent(properties);
agent.start();
worker = new ClientWorker(agent, configFilterChainManager);
}
Copy the code
ClientWorker
You can see here that the ClientWorker maintains the httpAgent within itself and creates two thread pools:
public ClientWorker(final ServerHttpAgent agent, final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize a scheduled thread pool
executor = Executors.newScheduledThreadPool(1.new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
returnt; }});// Initialize a cacheable thread pool, which should be used for long polling with the server based on the thread name.
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPulling" + agent.getName());
t.setDaemon(true);
returnt; }});// Set the execution content of the scheduled task
executor.scheduleWithFixedDelay(new Runnable() {
public void run(a) {
try {
checkConfigInfo();
} catch (Throwable e) {
log.error(agent.getName(), "NACOS-XXXX"."[sub-check] rotate check error", e); }}},1L.10L, TimeUnit.MILLISECONDS);
}
Copy the code
Here we see the initialization of two thread pools:
- The first thread pool is a timed thread pool that executes the checkConfigInfo() method every 10ms
- The second thread pool is a normal thread pool, guessed from the thread name, for long polling on the server side.
checkConfigInfo
Enter checkConfigInfo to continue:
AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(new HashMap<String, CacheData>());
public void checkConfigInfo(a) {
/ / task
int listenerSize = cacheMap.get().size();
// Round up to the number of batches
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// It takes a lot of thinking to determine if the task is executing. The task list is now unordered. The process of change can be problematic
executorService.execute(new LongPullingRunnable(i));
}
// Update the number of started taskscurrentLongingTaskCount = longingTaskCount; }}Copy the code
Based on the size of cacheMap, the data in the cache is distributed to multiple threads. By default, each long polling LongPullingRunnable task handles a configuration set of 3000 listeners. More than 3000 will require multiple tasks to process.
LongPullingRunnable#run()
class LongPullingRunnable implements Runnable {
private int taskId;
public LongPullingRunnable(int taskId) {
this.taskId = taskId;
}
public void run(a) {
try {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
// check failover config
for (CacheData cacheData : cacheMap.get().values()) {
// taskId is used to differentiate task batches in cacheMap
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
// Compare the cached data in the local file with the cached data in the cacheMap to determine whether the data has changed
checkLocalConfig(cacheData);
// Notify the listener if the data is braided
if(cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); }}catch (Exception e) {
log.error("NACOS-CLIENT"."get local config info error", e); }}}//... omit some code
} catch (Throwable e) {
log.error("500"."longPulling error", e);
} finally {
executorService.execute(this); }}}Copy the code
In the previous checkConfigInfo loop, a taskId was passed to LongPullingRunnable, which is used to distinguish between batches of tasks executed when multiple LongPullingRunnable is started.
The checkLocalConfig method is used to determine whether the data has changed.
checkLocalConfig
private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
// No -> yes
if(! cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
log.warn(agent.getName(),
"[failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return;
}
// Yes -> no. Do not notify the business listener when it gets the configuration from the server.
if(cacheData.isUseLocalConfigInfo() && ! path.exists()) { cacheData.setUseLocalConfigInfo(false);
log.warn(agent.getName(), "[failover-change] failover file deleted. dataId={}, group={}, tenant={}", dataId,
group, tenant);
return;
}
/ / is subject to change
if(cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() ! = path.lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
log.warn(agent.getName(),
"[failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, md5, ContentUtils.truncateContent(content));
return; }}Copy the code
Check the local configuration in the following three cases:
- If isUseLocalConfigInfo is false but a local file exists, set isUseLocalConfigInfo to true and update the contents of cacheData along with the version of the contents (when the file was last updated)
- If isUseLocalConfigInfo is true but the local file does not exist, it is set to false and the listener is not notified.
- If isUseLocalConfigInfo is true, the local cache file also exists. However, the version time of the cacheDate content is the same as the update time of the file. Update the cacheData and set isUseLocalConfigInfo to true.
checkListenerMd5
Back to the com. Alibaba. Nacos. Client. Config. Impl. ClientWorker. LongPullingRunnable# run method:
try {
// Compare the cached data in the local file with the cached data in the cacheMap to determine whether the data has changed
checkLocalConfig(cacheData);
// Notify the listener if the data is braided
if(cacheData.isUseLocalConfigInfo()) { cacheData.checkListenerMd5(); }}catch (Exception e) {
log.error("NACOS-CLIENT"."get local config info error", e);
}
Copy the code
CheckListenerMd5; checkListenerMd5; checkListenerMd5;
void checkListenerMd5(a) {
for (ManagerListenerWrap wrap : listeners) {
if(! md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, md5, wrap); }}}Copy the code
Iterate over all listeners and send a notification if the MD5 value of the data is found to be different.
safeNotifyListener
private void safeNotifyListener(final String dataId, final String group, final String content,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
public void run(a) {
//... omit some code
// Send notification of configuration changelistener.receiveConfigInfo(contentTmp); listenerWrap.lastCallMd5 = md5; }}Copy the code
receiveConfigInfo
The corresponding implementation class here is: NacosContextRefresher
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// many Spring context
if (this.ready.compareAndSet(false.true)) {
this.registerNacosListenersForApplications(); }}private void registerNacosListenersForApplications(a) {
//... omit some code
registerNacosListener(nacosPropertySource.getGroup(), dataId);
}
private void registerNacosListener(final String group, final String dataId) {
Listener listener = listenerMap.computeIfAbsent(dataId, i -> new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
loadCount.incrementAndGet();
String md5 = "";
if(! StringUtils.isEmpty(configInfo)) {try {
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = new BigInteger(1, md.digest(configInfo.getBytes("UTF-8")))
.toString(16);
}
catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
LOGGER.warn("[Nacos] unable to get md5 for dataId: " + dataId, e);
}
}
refreshHistory.add(dataId, md5);
// Send RefreshEvent notification.
applicationContext.publishEvent(
new RefreshEvent(this.null."Refresh Nacos config"));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Refresh Nacos config group{},dataId{}", group, dataId); }}@Override
public Executor getExecutor(a) {
return null; }});try {
configService.addListener(dataId, group, listener);
}
catch(NacosException e) { e.printStackTrace(); }}Copy the code
You can see that the corresponding registerNacosListener is registered in the onApplicationEvent method (the callback when the Spring container is ready). When configuration changes are made, RefreshEvent is used to notify the client (the @refreshScope annotation takes effect).
Check the server configuration
Back to the com. Alibaba. Nacos. Client. Config. Impl. ClientWorker. LongPullingRunnable# run, first through the local configuration read and check to determine whether the data change, if change, Send notification (cacheData checkListenerMd5 ()).
Next, the current thread needs to go to the remote server to get the latest data and check which data has changed:
List<String> inInitializingCacheList = new ArrayList<String>();
// check server config
// Get the changed dataId list from the server
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
// Iterates through the changed groupKey and requests the remote end to obtain the specified groupKey
String content = getServerConfig(dataId, group, tenant, 3000L);
// Get the content set to cacheDate
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);
log.info(agent.getName(), "[data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content));
} catch (NacosException ioe) {
log.error(agent.getName(), "NACOS-XXXX"."[get-update] get changed config exception. dataId={}, group={}, tenant={}, msg={}", dataId, group, tenant, ioe.toString()); }}// Iterate through all cacheData to find data changes to notify.
for (CacheData cacheData : cacheDatas) {
if(! cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
Copy the code
Here are the main operations:
- Use checkUpdateDataIds to get the list of dataids with data changes on the remote end.
- Iterate over the changed dataId and call getServerConfig to get the corresponding contents.
- Update the local cacheDate when the data is obtained.
- Finally, we iterate through cacheData to find the data that has changed for notification.
The getServerConfig and checkListenerMd5 are mentioned earlier, so we’ll skip them here. Basically the same as mentioned above. CheckUpdateDataIds: checkUpdateDataIds
checkUpdateDataIds
We first get data with isUseLocalConfigInfo set to false from cacheData and then request checkUpdateConfigStr after the request parameters are assembled
/** * Get the list of DataID values changed from Server. Only dataId and Group of the returned objects are valid. Ensure that NULL is not returned. * /
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) {
StringBuilder sb = new StringBuilder();
for (CacheData cacheData : cacheDatas) {
if(! cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {
sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
} else {
sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
}
if (cacheData.isInitializing()) {
// cacheData appears for the first time in cacheMap & first check updateinInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); }}}booleanisInitializingCacheList = ! inInitializingCacheList.isEmpty();return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
Copy the code
checkUpdateConfigStr
/** * Get the list of DataID values changed from Server. Only dataId and Group of the returned objects are valid. Ensure that NULL is not returned. * /
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) {
List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
// The timeout period
long timeout = TimeUnit.SECONDS.toMillis(30L);
// Set the request header
List<String> headers = new ArrayList<String>(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// Send an HTTP request, request /listener interface
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
// If the server returns normal, the dataId is parsed from the returned request and returned.
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
if (result.code == HttpURLConnection.HTTP_INTERNAL_ERROR) {
log.error("NACOS-0007", LoggerHelper.getErrorCodeStr("Nacos"."Nacos-0007"."Environmental issues"."[check-update] get changed dataId error"));
}
log.error(agent.getName(), "NACOS-XXXX"."[check-update] get changed dataId error, code={}", result.code); }}catch (IOException e) {
setHealthServer(false);
log.error(agent.getName(), "NACOS-XXXX"."[check-update] get changed dataId exception, msg={}",
e.toString());
}
return Collections.emptyList();
}
Copy the code
Here we see that when the client requests the server, it sets a timeout of 30 seconds. The/Listener interface is requested, and the server returns only dataId for data changes, with no configuration involved.
What does the server do when it receives this request
The service side
ConfigController#listener
In com. Alibaba. Nacos. Config. Server controller. ConfigController# listener as you can see, in the end is called inner. DoPollingConfig to deal with.
/** * The client listens for configuration changes. */
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED".true);
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
LOGGER.warn("invalid probeModify is blank");
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map<String, String> clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
Copy the code
inner.doPollingConfig
/** * Polling interface. */
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
// Long polling.
// If long polling is supported, go to this solvent
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// Compatible with short polling logic.
// Compatible with short polling
List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
// Compatible with short polling result.
String oldResult = MD5Util.compareMd5OldResult(changedGroups);
String newResult = MD5Util.compareMd5ResultString(changedGroups);
String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
if (version == null) {
version = "2.0.0";
}
int versionNum = Protocol.getVersionNumber(version);
// Before 2.0.4 version, return value is put into header.
if (versionNum < START_LONG_POLLING_VERSION_NUM) {
response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
} else {
request.setAttribute("content", newResult);
}
// Disable cache.
response.setHeader("Pragma"."no-cache");
response.setDateHeader("Expires".0);
response.setHeader("Cache-Control"."no-cache,no-store");
response.setStatus(HttpServletResponse.SC_OK);
return HttpServletResponse.SC_OK + "";
}
Copy the code
The short poll here is really just a normal HTTP request. The request is compared with MD5, and is directly returned to the client regardless of whether the configuration is updated.
Let’s focus on this long poll
longPollingService.addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
// Get the timeout in the request header
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
// Buffer time
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
// Get the timeout passed by the client and subtract the buffer time (500 ms in advance to avoid client timeouts) as the final timeout
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// Do nothing but set fix polling timeout.
} else {
long start = System.currentTimeMillis();
// Compare the MD5 message sent by the client with the groupKey on the server. If not, the result is returned directly via generateResponse.
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.CLIENT_LOG.info(| | | "{} {} {} {} | | {} {} | {}", System.currentTimeMillis() - start, "instant",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
} else if(noHangUpFlag ! =null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.CLIENT_LOG.info(| | | "{} {} {} {} | | {} {} | {}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// Must be called by http thread, or send response.
// AsyncContext is an object provided in Servlet3.0. When startAsync is called, the response to the request is delayed and the container's allocated thread is freed.
// To implement the long polling mechanism.
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() is incorrect, Control by oneself
asyncContext.setTimeout(0L);
ConfigExecutor.executeLongPolling(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
Copy the code
This method does the following things:
- Gets the timeout of the client request parameter for local calculation. For example, here we passed a timeout of 30S, then subtracted 500ms from the response time, resulting in a timeout of 29.5s.
- When receiving a request, the system compares the MD5 of the groupKey on the server with the MD5 of the groupKey on the client. If the configuration changes, the system returns the result regardless of the timeout period.
- Encapsulate if there are no configuration changes at the time of the current request
ClientLongPolling
Task toConfigExecutor.executeLongPolling
To deal with.
ClientLongPolling#run()
@Override
public void run(a) {
asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
@Override
public void run(a) {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subscriber's relations.
boolean removeFlag = allSubs.remove(ClientLongPolling.this);
if (removeFlag) {
if (isFixedPolling()) {
LogUtil.CLIENT_LOG
.info(| | | "{} {} {} {} | | {} {}", (System.currentTimeMillis() - createTime), "fix",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
List<String> changedGroups = MD5Util
.compareMd5((HttpServletRequest) asyncContext.getRequest(),
(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
// The client returns
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null); }}else {
LogUtil.CLIENT_LOG
.info(| | | "{} {} {} {} | | {} {}", (System.currentTimeMillis() - createTime), "timeout",
RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
"polling", clientMd5Map.size(), probeRequestSize);
sendResponse(null); }}else {
LogUtil.DEFAULT_LOG.warn("client subsciber's relations delete fail."); }}catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
Copy the code
What is done in the run method of ClientLongPolling? After 29.5s, check again whether the content of the corresponding data has changed, and respond to the client normally regardless of whether there is any change.
And here we found something. If no configuration changes are made, the client requests checkUpdateConfigStr. How can I get a dataId list with configuration changes from the server? At the top of the article, there is a brief introduction of the pull mechanism of the listening mechanism. In fact, nacOS uses the pull mechanism (the client actively requests the server to obtain it). One of the disadvantages of this mechanism is the resource consumption caused by the regular request to the server. But with long polling like NACOS, this point doesn’t seem to be a problem!
However, there seems to be a problem here. If all requests take 29.5 seconds to get a proper response, our configuration changes will take 29.5 seconds to notify the client in extreme cases. This doesn’t seem to meet the requirements of real-time.
What is necessary here is that when the user changes the configuration in the NacOS console, it can be directly returned to the client.
allSubs#add
AllSubs is a queue containing the ClientLongPolling object:
/** * ClientLongPolling subscibers. */
final Queue<ClientLongPolling> allSubs;
Copy the code
LongPollingService
Used in LongPollingService constructor, NotifyCenter. RegisterSubscriber subscribed to an event, if this incident is LocalDataChangeEvent, namely the service side of local data change, A DataChangeTask thread is executed.
public LongPollingService(a) {
allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
ConfigExecutor.scheduleLongPolling(new StatTask(), 0L.10L, TimeUnit.SECONDS);
// Register LocalDataChangeEvent to NotifyCenter.
NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
// Register A Subscriber to subscribe LocalDataChangeEvent.
NotifyCenter.registerSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// Ignore.
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
ConfigExecutor.executeLongPolling(newDataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); }}}@Override
public Class<? extends Event> subscribeType() {
returnLocalDataChangeEvent.class; }}); }Copy the code
DataChangeTask#run
In com. Alibaba. Nacos. Config. Server service. LongPollingService. DataChangeTask# run method:
@Override
public void run(a) {
try {
ConfigCacheService.getContentBetaMd5(groupKey);
for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// If published tag is not in the beta list, then it skipped.
if(isBeta && ! CollectionUtils.contains(betaIps, clientSub.ip)) {continue;
}
// If published tag is not in the tag list, then it skipped.
if(StringUtils.isNotBlank(tag) && ! tag.equals(clientSub.tag)) {continue;
}
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); // Delete subscribers' relationships.
LogUtil.CLIENT_LOG
.info(| | | "{} {} {} {} | | {} {} | {}", (System.currentTimeMillis() - changeTime), "in-advance",
RequestUtil
.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); }}}catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t)); }}Copy the code
This method iterates through all ClientLongPolling from allSubs, returning the data to the client via clientsub.sendresponse. So it’s understandable why data changes can trigger updates in real time.
The logic here on the server side:
- Check whether there is any data change after receiving the request. If there is any data change, directly respond to the client.
- If there is no data change, a delay task is started and 29.5s is delayed for the corresponding client.
- Triggered when configuration changes occur
DataChangeTask
To respond to client requests.
conclusion
The implementation of NACOS as a configuration center is outlined here, mainly from the following aspects:
- The client requests the server to obtain the configuration and cache it locally
NacosPropertySourceLocator
Load. - Add, delete, modify, and query configurations on the server.
- Dynamic awareness of client configuration through
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr
Method to set up a long poll with the server to achieve dynamic awareness of the configuration.
A long polling mechanism is mentioned, which simply means that when the client pulls the server, it sets a long timeout time (30S). When the server receives the request from the client, if there is no configuration change, it does not respond immediately, but suspends the request for 29.5s. When the client does not receive a response to a request, it waits to avoid empty polling in the common short polling mechanism and avoid wasting resources.
In addition, during the client long polling period, if there is a configuration change, it will be directly returned through event notification and break the 29.5s wait. Here is a bit similar to push mechanism. When the server has configuration updates, it actively notifies the client. However, there is no need to record some information required by the client for a long time.