Today we’ll take a look at soul’s ZooKeeper to synchronize data to the gateway.
1. Start ZooKeeper
Install the zookeeper
> brew install zookeeper
Copy the code
View the configuration file The default configuration file is in the following directory
> cd /usr/local/etc/zookeeper/
Copy the code
Start the service
> zkServer start
Copy the code
2. Start sole-admin
3. Start sole-bootstrap
-
Application-local.yml Added the ZooKeeper configuration
soul : sync: zookeeper: url: localhost:2181 sessionTimeout: 5000 connectionTimeout: 2000 Copy the code
-
Run SoulBootstrapApplication
Source code analysis:
-
Soul is configured with webSockets. The source code for webSockets can be found in the previous article (juejin.cn/post/691988…).
-
Same as the previous article, we added the configuration of ZooKeeper in the sole-bootstrap configuration file application-local.yml. We queried soul.sync.zookeeper and found four classes. ZookeeperConfig ZookeeperProperties, ZookeeperListener ZookeeperSyncDataConfiguration. ZookeeperConfig and ZookeeperProperties are the zooKeeper configuration information in the synchronization configuration file. ZookeeperListener, ZookeeperSyncDataConfiguration is the zookeeper monitor and data synchronization, and ZookeeperListener is when there is no ZookeeperSyncDataConfiguration execution, So we start the soul – the bootstrap start ZookeeperSyncDataConfiguration.
@Configuration @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url") @Import(ZookeeperConfiguration.class) static class ZookeeperListener { /** * When there is no ZookeeperSyncDataConfiguration execution. * * @ param zkClient the zk client * @ return the data changed listener * / @ Bean @ConditionalOnMissingBean(ZookeeperDataChangedListener.class) public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) { return new ZookeeperDataChangedListener(zkClient); } /** * Zookeeper data init zookeeper data init. * * @param zkClient the zk client * @param syncDataService the sync data service * @return the zookeeper data init */ @Bean @ConditionalOnMissingBean(ZookeeperDataInit.class) public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) { return new ZookeeperDataInit(zkClient, syncDataService); }}Copy the code
Public class ZookeeperSyncDataConfiguration {/ * * * when there is no ZookeeperSyncDataConfiguration execution. * * @ param zkClient the zk client * @param pluginSubscriber the plugin subscriber * @param metaSubscribers the meta subscribers * @param authSubscribers the auth subscribers * @return the sync data service */ @Bean public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber, final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) { log.info("you use zookeeper sync soul data......." ); return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(), metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList)); } /** * register zkClient in spring ioc. * * @param zookeeperConfig the zookeeper configuration * @return ZkClient {@linkplain ZkClient} */ @Bean public ZkClient zkClient(final ZookeeperConfig zookeeperConfig) { return new ZkClient(zookeeperConfig.getUrl(), zookeeperConfig.getSessionTimeout(), zookeeperConfig.getConnectionTimeout()); }}Copy the code
- Let’s start sole-bootstrap and debug the above classes
We found that the project is started, the first to engage in SyncDataService ZookeeperSyncDataConfiguration initialization, and carry on data, appAuth, metaData release, because the initial data is empty, Therefore, initial data is obtained from ZooKeeper and cached to the local and ZooKeeper nodes.
private void watcherData() { final String pluginParent = ZkPathConstants.PLUGIN_PARENT; List<String> pluginZKs = zkClientGetChildren(pluginParent); for (String pluginName : pluginZKs) { watcherAll(pluginName); } zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> { if (CollectionUtils.isNotEmpty(currentChildren)) { for (String pluginName : currentChildren) { watcherAll(pluginName); }}}); } private void watcherAll(final String pluginName) { watcherPlugin(pluginName); // When the project starts, The Plugin, Selector, and Rule of each plug-in are cached to PLUGIN_MAP, SELECTOR_MAP, and RULE_MAP of the local cache BaseDataCache, and corresponding ZK nodes are created watcherSelector(pluginName); // When the project starts, AppAuthData is cached to SignAuthDataCache's AUTH_MAP and the corresponding ZK node watcherRule(pluginName) is created. META_DATA_MAP (MetaDataCache) and create the corresponding zk node}Copy the code
private void watcherPlugin(final String pluginName) { String pluginPath = ZkPathConstants.buildPluginPath(pluginName); if (! zkClient.exists(pluginPath)) { zkClient.createPersistent(pluginPath, true); } cachePluginData(zkClient.readData(pluginPath)); subscribePluginDataChanges(pluginPath, pluginName); } private void cachePluginData(final PluginData pluginData) { Optional.ofNullable(pluginData).flatMap(data -> Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> e.onSubscribe(pluginData)); } @Override public void onSubscribe(final PluginData pluginData) { subscribeDataHandler(pluginData, DataEventTypeEnum.UPDATE); }Copy the code
public class CommonPluginDataSubscriber implements PluginDataSubscriber { private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) { Optional.ofNullable(classData).ifPresent(data -> { if (data instanceof PluginData) { PluginData pluginData = (PluginData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cachePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData)); } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); } } else if (data instanceof SelectorData) { SelectorData selectorData = (SelectorData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cacheSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData)); } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); } } else if (data instanceof RuleData) { RuleData ruleData = (RuleData) data; if (dataType == DataEventTypeEnum.UPDATE) { BaseDataCache.getInstance().cacheRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData)); } else if (dataType == DataEventTypeEnum.DELETE) { BaseDataCache.getInstance().removeRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); }}}); },,}Copy the code
private void watchAppAuth() {
final String appAuthParent = ZkPathConstants.APP_AUTH_PARENT;
List<String> childrenList = zkClientGetChildren(appAuthParent);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(appAuthParent, children);
cacheAuthData(zkClient.readData(realPath));
subscribeAppAuthDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent, childrenList);
}
Copy the code
private void watchMetaData() {
final String metaDataPath = ZkPathConstants.META_DATA;
List<String> childrenList = zkClientGetChildren(metaDataPath);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(metaDataPath, children);
cacheMetaData(zkClient.readData(realPath));
subscribeMetaDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, metaDataPath, childrenList);
}
Copy the code
private List<String> zkClientGetChildren(final String parent) { if (! zkClient.exists(parent)) { zkClient.createPersistent(parent, true); } return zkClient.getChildren(parent); }Copy the code
- We changed the Plugin in system-manage -> Plugin in the soul-admin background. Find DataChangedEvent monitored event execution is still perform WebsocketDataChangedListener plug-in method change, ZookeeperDataChangedListener didn’t take effect.
We check the soul – admin ZookeeperDataChangedListener initialization code and application configuration file. Yml, ZookeeperDataChangedListener initialization need soul. Sync. Zookeeper. Url has a value, Don’t initialize WebsocketListener need soul. Sync. Websocket. Enabled = false. So the soul. The sync. Websocket. Enabled is set to false, add the corresponding zookeeper configuration, restart the soul – admin, the soul – the bootstrap.
@Configuration @ConditionalOnProperty(prefix = "soul.sync.zookeeper", name = "url") @Import(ZookeeperConfiguration.class) static class ZookeeperListener { /** * Config event listener data changed listener. * * @param zkClient the zk client * @return the data changed listener */ @Bean @ConditionalOnMissingBean(ZookeeperDataChangedListener.class) public DataChangedListener zookeeperDataChangedListener(final ZkClient zkClient) { return new ZookeeperDataChangedListener(zkClient); } /** * Zookeeper data init zookeeper data init. * * @param zkClient the zk client * @param syncDataService the sync data service * @return the zookeeper data init */ @Bean @ConditionalOnMissingBean(ZookeeperDataInit.class) public ZookeeperDataInit zookeeperDataInit(final ZkClient zkClient, final SyncDataService syncDataService) { return new ZookeeperDataInit(zkClient, syncDataService); }}Copy the code
@Configuration @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true) @EnableConfigurationProperties(WebsocketSyncProperties.class) static class WebsocketListener { /** * Config event listener data changed listener. * * @return the data changed listener */ @Bean @ConditionalOnMissingBean(WebsocketDataChangedListener.class) public DataChangedListener websocketDataChangedListener() { return new WebsocketDataChangedListener(); }... }Copy the code
soul:
database:
dialect: mysql
init_script: "META-INF/schema.sql"
init_enable: true
sync:
websocket:
enabled: false
zookeeper:
url: localhost:2181
sessionTimeout: 5000
connectionTimeout: 2000
Copy the code
And then proceed the System – the manage – > modify the Plugin in the Plugin, found ZookeeperDataChangedListener effect, the changed pages, local cache is to take effect.
conclusion
To verify that ZooKeeper synchronizes data, disable the default Websocket and enable the zooKeeper configuration. By analyzing configurations, where configurations are applied, which configurations need to be referenced by corresponding classes, and debugging, you can locate problems accurately.