Today we’ll take a look at Soul’s NACOS to synchronize data to the gateway.
1. Start nacOS
Install nacos
> docker pull nacos/nacos-server
Copy the code
Start the NacOS service
> docker run --name nacos -d -p 8848:8848 -e MODE=standalone nacos/nacos-server
Copy the code
Access the NACOS service
> to http://localhost:8848/nacos, the username: nacos, password: nacos. The NACOS service is normalCopy the code
2. Start sole-admin
-
Configure the application.xml configuration file and open only the NACOS configuration.
soul: database: dialect: mysql init_script: "META-INF/schema.sql" init_enable: true sync: websocket: enabled: false # zookeeper: # url: localhost:2181 # sessionTimeout: 5000 # connectionTimeout: 2000 # http: # enabled: true nacos: url: localhost:8848 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c acm: enabled: false endpoint: acm.aliyun.com namespace: accessKey: secretKey: Copy the code
-
Start SoulAdminBootstrap.
3. Start sole-bootstrap
-
Pom.xml added nacOS dependency
<dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId> <version>${project.version}</version> </dependency> Copy the code
-
Configure application-local.yml and enable only nacOS configuration
soul : file: enabled: true corss: enabled: true # dubbo : # parameter: multi sync: # websocket : # urls: ws://localhost:9095/websocket # zookeeper: # url: localhost:2181 # sessionTimeout: 5000 # connectionTimeout: 2000 # http: # url : http://localhost:9095 nacos: url: localhost:8848 namespace: 1c10d748-af86-43b9-8265-75f487d20c6c acm: enabled: false endpoint: acm.aliyun.com namespace: accessKey: secretKey: Copy the code
-
Start SoulBootstrapApplication.
Source code analysis:
1. Analyze the source-Admin nacOS source code
-
Parsing from the soul-admin application. XML configuration file, we query where the soul.sync.nacos references, Found DataSyncConfiguration NacosListener and NacosSyncDataConfiguration, the two one is loaded classes as soul – admin startup, One is a class loaded as soul-Bootstrap startup. From the analysis of the previous articles, we know the NacosListener of the DataSyncConfiguration that loads the Soul configuration when soul-admin starts. After will also perform DataChangedEventDispatcher afterPropertiesSet method, listeners to initialize the data, data change events to monitor.
@Configuration public class DataSyncConfiguration { /** * The type Nacos listener. */ @Configuration @ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url") @Import(NacosConfiguration.class) static class NacosListener { /** * Data changed listener data changed listener. * * @param configService the config service * @return the data changed listener */ @Bean @ConditionalOnMissingBean(NacosDataChangedListener.class) public DataChangedListener nacosDataChangedListener(final ConfigService configService) { return new NacosDataChangedListener(configService); }},,}Copy the code
@Component public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { private ApplicationContext applicationContext; private List<DataChangedListener> listeners; public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } @Override public void afterPropertiesSet() { Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); }}Copy the code
- We debug the verification and find that the situation is as expected.
2. Analyze the nacOS source code of soul-Bootstrap
-
Yml configuration file from soul-admin’s application-local.yml configuration file to query where soul.sync.nacos references,
@Configuration @ConditionalOnClass(NacosSyncDataService.class) @ConditionalOnProperty(prefix = "soul.sync.nacos", name = "url") @Slf4j public class NacosSyncDataConfiguration { /** * Nacos sync data service. * * @param configService the config service * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService nacosSyncDataService(final ObjectProvider<ConfigService> configService, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use nacos sync soul data......." ); return new NacosSyncDataService(configService.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } /** * Nacos config service config service. * * @param nacosConfig the nacos config * @return the config service * @throws Exception the exception */ @Bean public ConfigService nacosConfigService(final NacosConfig nacosConfig) throws Exception { Properties properties = new Properties(); if (nacosConfig.getAcm() ! = null && nacosConfig.getAcm().isEnabled()) { properties.put(PropertyKeyConst.ENDPOINT, nacosConfig.getAcm().getEndpoint()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getAcm().getNamespace()); properties.put(PropertyKeyConst.ACCESS_KEY, nacosConfig.getAcm().getAccessKey()); properties.put(PropertyKeyConst.SECRET_KEY, nacosConfig.getAcm().getSecretKey()); } else { properties.put(PropertyKeyConst.SERVER_ADDR, nacosConfig.getUrl()); properties.put(PropertyKeyConst.NAMESPACE, nacosConfig.getNamespace()); } return NacosFactory.createConfigService(properties); } /** * Http config http config. * * @return the http config */ @Bean @ConfigurationProperties(prefix = "soul.sync.nacos") public NacosConfig nacosConfig() { return new NacosConfig(); }}Copy the code
public class NacosSyncDataService extends NacosCacheHandler implements AutoCloseable, SyncDataService { /** * Instantiates a new Nacos sync data service. * * @param configService the config service * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public NacosSyncDataService(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { super(configService, pluginDataSubscriber, metaDataSubscribers, authDataSubscribers); start(); } /** * Public void start() {watcherData(PLUGIN_DATA_ID, PLUGIN_DATA_ID, this::updatePluginMap); watcherData(SELECTOR_DATA_ID, this::updateSelectorMap); watcherData(RULE_DATA_ID, this::updateRuleMap); watcherData(META_DATA_ID, this::updateMetaDataMap); watcherData(AUTH_DATA_ID, this::updateAuthMap); } @Override public void close() { LISTENERS.forEach((dataId, lss) -> { lss.forEach(listener -> getConfigService().removeListener(dataId, GROUP, listener)); lss.clear(); }); LISTENERS.clear(); }}Copy the code
@Slf4j public class NacosCacheHandler { protected static final String GROUP = "DEFAULT_GROUP"; protected static final String PLUGIN_DATA_ID = "soul.plugin.json"; protected static final String SELECTOR_DATA_ID = "soul.selector.json"; protected static final String RULE_DATA_ID = "soul.rule.json"; protected static final String AUTH_DATA_ID = "soul.auth.json"; protected static final String META_DATA_ID = "soul.meta.json"; protected static final Map<String, List<Listener>> LISTENERS = Maps.newConcurrentMap(); @Getter private final ConfigService configService; private final PluginDataSubscriber pluginDataSubscriber; private final List<MetaDataSubscriber> metaDataSubscribers; private final List<AuthDataSubscriber> authDataSubscribers; public NacosCacheHandler(final ConfigService configService, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { this.configService = configService; this.pluginDataSubscriber = pluginDataSubscriber; this.metaDataSubscribers = metaDataSubscribers; this.authDataSubscribers = authDataSubscribers; } protected void updatePluginMap(final String configInfo) { try { // Fix bug #656(https://github.com/dromara/soul/issues/656) List<PluginData> pluginDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, PluginData.class).values()); pluginDataList.forEach(pluginData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { subscriber.unSubscribe(pluginData); subscriber.onSubscribe(pluginData); })); } catch (JsonParseException e) { log.error("sync plugin data have error:", e); } } protected void updateSelectorMap(final String configInfo) { try { List<SelectorData> selectorDataList = GsonUtils.getInstance().toObjectMapList(configInfo, SelectorData.class).values().stream().flatMap(Collection::stream).collect(Collectors.toList()); selectorDataList.forEach(selectorData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { subscriber.unSelectorSubscribe(selectorData); subscriber.onSelectorSubscribe(selectorData); })); } catch (JsonParseException e) { log.error("sync selector data have error:", e); } } protected void updateRuleMap(final String configInfo) { try { List<RuleData> ruleDataList = GsonUtils.getInstance().toObjectMapList(configInfo, RuleData.class).values() .stream().flatMap(Collection::stream) .collect(Collectors.toList()); ruleDataList.forEach(ruleData -> Optional.ofNullable(pluginDataSubscriber).ifPresent(subscriber -> { subscriber.unRuleSubscribe(ruleData); subscriber.onRuleSubscribe(ruleData); })); } catch (JsonParseException e) { log.error("sync rule data have error:", e); } } protected void updateMetaDataMap(final String configInfo) { try { List<MetaData> metaDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, MetaData.class).values()); metaDataList.forEach(metaData -> metaDataSubscribers.forEach(subscriber -> { subscriber.unSubscribe(metaData); subscriber.onSubscribe(metaData); })); } catch (JsonParseException e) { log.error("sync meta data have error:", e); } } protected void updateAuthMap(final String configInfo) { try { List<AppAuthData> appAuthDataList = new ArrayList<>(GsonUtils.getInstance().toObjectMap(configInfo, AppAuthData.class).values()); appAuthDataList.forEach(appAuthData -> authDataSubscribers.forEach(subscriber -> { subscriber.unSubscribe(appAuthData); subscriber.onSubscribe(appAuthData); })); } catch (JsonParseException e) { log.error("sync auth data have error:", e); } } @SneakyThrows private String getConfigAndSignListener(final String dataId, final Listener listener) { return configService.getConfigAndSignListener(dataId, GROUP, 6000, listener); } protected void watcherData(final String dataId, final OnChange oc) { Listener listener = new Listener() { @Override public void receiveConfigInfo(final String configInfo) { oc.change(configInfo); } @Override public Executor getExecutor() { return null; }}; oc.change(getConfigAndSignListener(dataId, listener)); LISTENERS.getOrDefault(dataId, new ArrayList<>()).add(listener); } protected interface OnChange { void change(String changeData); }}Copy the code
-
We debug the validation
conclusion
At this point, we understand the use of HTTP long polling.