Today we will look at the Soul Websocket. We will also use the soul-examples-Spring Cloud service as an example. 1. Start sole-admin 2. Start Eureka-server 3
Source code analysis:
-
Let’s look at the configuration file application-local.yml in sole-Bootstrap
soul : file: enabled: true corss: enabled: true dubbo : parameter: multi sync: websocket : urls: ws://localhost:9095/websocket Copy the code
Websocket is configured on Soul, we search the sole in the project. The sync. Websocket, we find the WebsocketSyncDataConfiguration, We send WebsocketSyncDataConfiguration in loading beans SyncDataService, executes WebsocketSyncDataService constructor. The WebsocketSyncDataService constructor establishes a SoleWebsocketClient connection once, then iterates over the WebSocket connection every 30 seconds and re-establishes the connection if it breaks.
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
@Slf4j
public class WebsocketSyncDataConfiguration {
/**
* Websocket sync data service.
*
* @param websocketConfig the websocket config
* @param pluginSubscriber the plugin subscriber
* @param metaSubscribers the meta subscribers
* @param authSubscribers the auth subscribers
* @return the sync data service
*/
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Config websocket config.
*
* @return the websocket config
*/
@Bean
@ConfigurationProperties(prefix = "soul.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
Copy the code
@Slf4j public class WebsocketSyncDataService implements SyncDataService, AutoCloseable { private final List<WebSocketClient> clients = new ArrayList<>(); private final ScheduledThreadPoolExecutor executor; /** * Instantiates a new Websocket sync cache. * * @param websocketConfig the websocket config * @param pluginDataSubscriber the plugin data subscriber * @param metaDataSubscribers the meta data subscribers * @param authDataSubscribers the auth data subscribers */ public WebsocketSyncDataService(final WebsocketConfig websocketConfig, final PluginDataSubscriber pluginDataSubscriber, final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) { String[] urls = StringUtils.split(websocketConfig.getUrls(), ", "); executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true)); for (String url : urls) { try { clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers)); } catch (URISyntaxException e) { log.error("websocket url({}) is error", url, e); } } try { for (WebSocketClient client : clients) { boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS); if (success) { log.info("websocket connection is successful....." ); } else { log.error("websocket connection is error....." ); } executor.scheduleAtFixedRate(() -> { try { if (client.isClosed()) { boolean reconnectSuccess = client.reconnectBlocking(); if (reconnectSuccess) { log.info("websocket reconnect is successful....." ); } else { log.error("websocket reconnection is error....." ); } } } catch (InterruptedException e) { log.error("websocket connect is error :{}", e.getMessage()); } }, 10, 30, TimeUnit.SECONDS); } /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80))); */ } catch (InterruptedException e) { log.info("websocket connection... exception...." , e); } } @Override public void close() { for (WebSocketClient client : clients) { if (! client.isClosed()) { client.close(); } } if (Objects.nonNull(executor)) { executor.shutdown(); }}}Copy the code
Yml/application-local.yml/application-local.yml/application-local.yml/application-local.yml
/** * @serverendpoint annotation is a class-level annotation, Its main function is to define the current class as a WebSocket server, * The annotated value will be used to listen for the user's connection to the terminal access URL, which the client can connect to the WebSocket server */ @slf4j@serverendpoint ("/ WebSocket ") public class WebsocketCollector { private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>(); private static final String SESSION_KEY = "sessionKey"; /** * The method to call when the connection was successfully established ** @param session Optional argument. Session indicates the connection session with a client. / @onOpen public void OnOpen (final Session Session) {log.info("websocket on open successful....") ); SESSION_SET.add(session); } /** * Method called after receiving client messages ** @param Message Message sent from the client * @param Session optional parameter * @throws Exception */ @onMessage public void onMessage(final String message, final Session session) { if (message.equals(DataEventTypeEnum.MYSELF.name())) { try { ThreadLocalUtil.put(SESSION_KEY, session); SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF); } finally { ThreadLocalUtil.clear(); Public void OnClose (final Session Session) {session_set.remove (Session); ThreadLocalUtil.clear(); } @param session @param error @onError public void OnError (final session session, final Throwable error) { SESSION_SET.remove(session); ThreadLocalUtil.clear(); log.error("websocket collection error: ", error); } /** * Send. * * @param message the message * @param type the type */ public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { if (DataEventTypeEnum.MYSELF == type) { try { Session session = (Session) ThreadLocalUtil.get(SESSION_KEY); if (session ! = null) { session.getBasicRemote().sendText(message); } } catch (IOException e) { log.error("websocket send result is exception: ", e); } return; } for (Session session : SESSION_SET) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); } } } } }Copy the code
When a WebSocket connection is established, the onMessage method of the WebsocketCollector is called to place the session information in the local cache. SyncDataService. SyncAll (DataEventTypeEnum.MYSELF) is called to synchronize all plug-in, selector and selector rule data information in DB.
SyncDataService:
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
Copy the code
3, our full text query DataChangedEvent listener, found the DataChangedEventDispatcher. When sending DataChangedEvent events, will perform DataChangedEventDispatcher listener method, relevant methods for WebsocketDataChangedListener changes. Execute the Send method of the WebsocketCollector to store the plugin, Selector, and Rule in the THREAD_CONTEXT of the ThreadLocalUtil class.
@Component public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean { private ApplicationContext applicationContext; private List<DataChangedListener> listeners; public DataChangedEventDispatcher(final ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override @SuppressWarnings("unchecked") public void onApplicationEvent(final DataChangedEvent event) { for (DataChangedListener listener : listeners) { switch (event.getGroupKey()) { case APP_AUTH: listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType()); break; case PLUGIN: listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType()); break; case RULE: listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType()); break; case SELECTOR: listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType()); break; case META_DATA: listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType()); break; default: throw new IllegalStateException("Unexpected value: " + event.getGroupKey()); } } } @Override public void afterPropertiesSet() { Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values(); this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans)); }}Copy the code
public class WebsocketDataChangedListener implements DataChangedListener { @Override public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) { WebsocketData<PluginData> websocketData = new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); } @Override public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) { WebsocketData<SelectorData> websocketData = new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); } @Override public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) { WebsocketData<RuleData> configData = new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); } @Override public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) { WebsocketData<AppAuthData> configData = new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); } @Override public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) { WebsocketData<MetaData> configData = new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType); }}Copy the code
public static void send(final String message, final DataEventTypeEnum type) { if (StringUtils.isNotBlank(message)) { if (DataEventTypeEnum.MYSELF == type) { try { Session session = (Session) ThreadLocalUtil.get(SESSION_KEY); if (session ! = null) { session.getBasicRemote().sendText(message); } } catch (IOException e) { log.error("websocket send result is exception: ", e); } return; } for (Session session : SESSION_SET) { try { session.getBasicRemote().sendText(message); } catch (IOException e) { log.error("websocket send result is exception: ", e); }}}}Copy the code
public class ThreadLocalUtil { private static final ThreadLocal<Map<String, Object>> THREAD_CONTEXT = ThreadLocal.withInitial(HashMap::new); /** * save thread variable. * * @param key put key * @param value put value */ public static void put(final String key, final Object value) { THREAD_CONTEXT.get().put(key, value); } /** * remove thread variable. * * @param key remove key */ public static void remove(final String key) { THREAD_CONTEXT.get().remove(key); } /** * get thread variables. * * @param key get key * @return the Object */ public static Object get(final String key) { return THREAD_CONTEXT.get().get(key); } /** * remove all variables. */ public static void clear() { THREAD_CONTEXT.remove(); }}Copy the code
4. When we modify the corresponding Plugin(plug-in), SelectorList(selector) and RulesList(selector configuration rule) in sole-admin background, besides saving or updating DB, we also send DataChangedEvent event to execute the above process operation. Save the local ThreadLocal cache THREAD_CONTEXT.
@PutMapping("/{id}")
public SoulAdminResult updatePlugin(@PathVariable("id") final String id, @RequestBody final PluginDTO pluginDTO) {
Objects.requireNonNull(pluginDTO);
pluginDTO.setId(id);
final String result = pluginService.createOrUpdate(pluginDTO);
if (StringUtils.isNoneBlank(result)) {
return SoulAdminResult.error(result);
}
return SoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS);
}
Copy the code
@Override
@Transactional(rollbackFor = Exception.class)
public String createOrUpdate(final PluginDTO pluginDTO) {
final String msg = checkData(pluginDTO);
if (StringUtils.isNoneBlank(msg)) {
return msg;
}
PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);
DataEventTypeEnum eventType = DataEventTypeEnum.CREATE;
if (StringUtils.isBlank(pluginDTO.getId())) {
pluginMapper.insertSelective(pluginDO);
} else {
eventType = DataEventTypeEnum.UPDATE;
pluginMapper.updateSelective(pluginDO);
}
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, eventType,
Collections.singletonList(PluginTransfer.INSTANCE.mapToData(pluginDO))));
return StringUtils.EMPTY;
}
Copy the code
Now that we know how websocket is used, we have made many local Plugin, SelectorList and RulesList changes to sole-bootstrap and soul-admin. The situation is the same as what we have seen above. The screenshot of debug will not be carried out here. If you are interested, you can carry out debugging according to the above process.