Introduction to the
This article explores the Soul gateway Admin’s Websocket data synchronization process
An overview of
Start the sample with Websocket synchronization
According to the previous Zookeeper and Nacos data synchronization analysis experience, find the Websocket event listener processing class, on its breakpoint, debug to view the initialization process
Then modify the status of the plug-in in the Admin background to debug and view the data change processing process
Find that initialization starts from the familiar syncAll, and event change processing starts from Controllers, which is documented in the source Debug section
The sample run
Start database:
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Copy the code
Run soul-admin and set the data synchronization mode to Websocket
soul:
sync:
websocket:
enabled: true
Copy the code
Soul-bootstrap, websocket synchronization, and soul-bootstrap
Soul: # open the websocket data synchronization sync: websocket: urls: ws: / / localhost: 9095 / websocketCopy the code
Run soul-example-http to register some data for Debug testing
Source code for the Debug
Initialization Process
First of all, according to the previous experience in the Soul – the Admin module, the listener. The websocket directory under the package to find the corresponding processing class: websocket event listeners WebsocketDataChangedListener
We find the plugin change handling function, port it, and restart Admin
After successfully entering the breakpoint, we can see that the following function basically encapsulates the data in Websocket format, and then sends the data in Websocket format
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
newWebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }}Copy the code
PS: There is a small detail to this. If no Bootstrap or event is sent, the breakpoint will not enter
We follow the call stack to the familiar event handling distribution from the previous article, and continue here
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
/ / the Plugin
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: "+ event.getGroupKey()); }}}}Copy the code
Come to the familiar full data synchronization function: read all the data from the database, then publish the event to synchronize
public class SyncDataServiceImpl implements SyncDataService {
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true; }}Copy the code
If you have written Websocket, you must be very familiar with the following function, which is to receive the message, and then call the processing logic
During debugging, we see that the value of message is “MYSELF” and assume that the initial communication convention of the Websocket is to receive “MYSELF” and synchronize the full data to the Bootstrap
public class WebsocketCollector {
@OnMessage
public void onMessage(final String message, final Session session) {
// message == MYSELF
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
try {
// Save the client Session information
ThreadLocalUtil.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally{ ThreadLocalUtil.clear(); }}}}Copy the code
Going back to the Websocket send function, there are two things to note:
1. When “MYSELF” is used, the message is only sent to a specific client
In the above function, the session is saved using ThreadLocal, then retrieved, and then sent using session
This should be for the newly connected Bootstrap and send the full amount of data to it
2. If it’s not MYSELF, send a message to all clients
This should be the event change and then synchronize the data to all clients
public class WebsocketCollector {
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
if (DataEventTypeEnum.MYSELF == type) {
Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
if(session ! =null) { sendMessageBySession(session, message); }}else{ SESSION_SET.forEach(session -> sendMessageBySession(session, message)); }}}}Copy the code
Here we see ThreadLocal for identity retrieval, but the SESSION_KEY is the same. It’s a bit of a muddle, so we’ll look at it later
In the past, we’ve done this client-specific processing directly in the OnMessage; Or the client has an ID when it connects
I feel like I’ve learned something from this
Data changes
Data synchronization finished, we in the Admin background management interface, modify the status of the current limiting plug-in, and then trigger into the function we hit the breakpoint at the beginning:
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
newWebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }}Copy the code
Tracing the call stack events, event distribution skips and comes back to the familiar PluginServiceImpl, where plug-in data is updated and events are published
public class PluginServiceImpl implements PluginService {
@Override
@Transactional(rollbackFor = Exception.class)
public String createOrUpdate(final PluginDTO pluginDTO) {
final String msg = checkData(pluginDTO);
if (StringUtils.isNoneBlank(msg)) {
return msg;
}
PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);
DataEventTypeEnum eventType = DataEventTypeEnum.CREATE;
if (StringUtils.isBlank(pluginDTO.getId())) {
pluginMapper.insertSelective(pluginDO);
} else {
eventType = DataEventTypeEnum.UPDATE;
pluginMapper.updateSelective(pluginDO);
}
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, eventType,
Collections.singletonList(PluginTransfer.INSTANCE.mapToData(pluginDO))));
returnStringUtils.EMPTY; }}Copy the code
Continue tracing to the Controllers interface of the property from where event updates are triggered
@RestController
@RequestMapping("/plugin")
public class PluginController {
@PutMapping("/{id}")
public SoulAdminResult updatePlugin(@PathVariable("id") final String id, @RequestBody final PluginDTO pluginDTO) {
Objects.requireNonNull(pluginDTO);
pluginDTO.setId(id);
final String result = pluginService.createOrUpdate(pluginDTO);
if (StringUtils.isNoneBlank(result)) {
return SoulAdminResult.error(result);
}
returnSoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS); }}Copy the code
conclusion
This article has carried on the preliminary exploration of the processing flow of Websocket data synchronization on Admin side, which can be roughly divided into initialization and data update (including deletion) processing flow
-
Data initialization: Boostrap sends the MYSELF message to trigger all data synchronization
-
Data processing (listening class ZookeeperDataChangedListener)
- HTTP interface call: can be the management background; It can also be a service registration Client
- Service invocation: Updates the data in the database and invokes the publish event interface
- Publish events: Publish events to data synchronization listeners
- Data update: After receiving the event, update (Websocket push, Zookeeper write, HTTP update MD5, Nacos write)
The three chapters: Zookeeper, Nacos and Websocket found that the processing process is basically similar. The initialization entry, event trigger and distribution are all the same in the previous chapters, but the specific sending logic is different
You can see that the structure of the code is very clear