Introduction to the
In this article, we explore the details of Websocket data synchronization
The sample run
Start database:
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Copy the code
Run soul-admin first
Soul-bootstrap, websocket synchronization, and soul-bootstrap
Corss: enabled: true Dubbo: parameter: multi # Sync: webSocket: urls: ws://localhost:9095/websocketCopy the code
Run soul-example-http to register some data for Debug testing
Source code for the Debug
In the previous analysis, we know the core class of data update and its core code as follows:
public class CommonPluginDataSubscriber implements PluginDataSubscriber {...// We can see that there is a slight flaw in the naming of the function; OnSubscribe is not a very good function, it's a little bit too general
@Override
public void onSubscribe(final PluginData pluginData) {
subscribeDataHandler(pluginData, DataEventTypeEnum.UPDATE);
}
@Override
public void unSubscribe(final PluginData pluginData) {
subscribeDataHandler(pluginData, DataEventTypeEnum.DELETE);
}
public void onSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
}
@Override
public void unSelectorSubscribe(final SelectorData selectorData) {
subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);
}
@Override
public void onRuleSubscribe(final RuleData ruleData) {
subscribeDataHandler(ruleData, DataEventTypeEnum.UPDATE);
}
@Override
public void unRuleSubscribe(final RuleData ruleData) {
subscribeDataHandler(ruleData, DataEventTypeEnum.DELETE);
}
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
if (data instanceof PluginData) {
PluginData pluginData = (PluginData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
// Update the plug-in configuration data
BaseDataCache.getInstance().cachePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
} else if (dataType == DataEventTypeEnum.DELETE) {
// Delete plug-in configuration dataBaseDataCache.getInstance().removePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); }}else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
// Update the selector data
BaseDataCache.getInstance().cacheSelectData(selectorData);
Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
} else if (dataType == DataEventTypeEnum.DELETE) {
// Delete the selector dataBaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); }}else if (data instanceof RuleData) {
RuleData ruleData = (RuleData) data;
if (dataType == DataEventTypeEnum.UPDATE) {
// Update the rule data
BaseDataCache.getInstance().cacheRuleData(ruleData);
Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
} else if (dataType == DataEventTypeEnum.DELETE) {
// Delete rule dataBaseDataCache.getInstance().removeRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); }}}); }}Copy the code
Let’s take a look at the plugin configuration data first, and put a breakpoint on onSubscribe and unSubscribe to restart soul-bootstrap
public class PluginDataHandler extends AbstractDataHandler<PluginData> {
// Restore, executed on startup
@Override
protected void doRefresh(final List<PluginData> dataList) {
// Clear the cached data
pluginDataSubscriber.refreshPluginDataSelf(dataList);
dataList.forEach(pluginDataSubscriber::onSubscribe);
}
// Update data (increment)
@Override
protected void doUpdate(final List<PluginData> dataList) {
dataList.forEach(pluginDataSubscriber::onSubscribe);
}
// Delete data
@Override
protected void doDelete(final List<PluginData> dataList) { dataList.forEach(pluginDataSubscriber::unSubscribe); }}Copy the code
This class is the only call CommonPluginDataSubscriber related websocket synchronization module interface, you can see it’s three interfaces: restore, update, delete
Datalist we look at the values, we can find all the data related to our plugin, very important, we continue to track its source
Looking at the call stack, we go to the function below and see that it calls the three interfaces of the previous function
public abstract class AbstractDataHandler<T> implements DataHandler {
// Corresponding to the above three operations: restore, update, delete
@Override
public void handle(final String json, final String eventType) {
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList);
break;
case UPDATE:
case CREATE:
doUpdate(dataList);
break;
case DELETE:
doDelete(dataList);
break;
default:
break; }}}}Copy the code
We see the DataEventTypeEnum: DELETE/CREATE/UPDATE/REFRESH MYSELF, basic can correspond to the up and down the SWITH case
And seeing the json converted dataList, we continue to follow
public class WebsocketDataHandler {
public void executor(final ConfigGroupEnum type, final String json, final String eventType) { ENUM_MAP.get(type).handle(json, eventType); }}Copy the code
Type, the function and evenTtype little attention, take a look at the ConfigGroupEnum: APP_AUTH/plugins/RULE/SELECTOR/META_DATA, find some front we ignore the synchronization of data, In addition to plug-in configuration data, selector data, rule data, there are also APP_AUTH data and META_DATA (feel and RPC related), we make up the test later
Continue with the Websocket property, and the previous analysis of the article is basically the same, through a Websocket client, receive a message triggered call
@Slf4j
public final class SoulWebsocketClient extends WebSocketClient {
@Override
public void onMessage(final String result) {
handleResult(result);
}
@SuppressWarnings("ALL")
private void handleResult(final String result) { WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData()); websocketDataHandler.executor(groupEnum, json, eventType); }}Copy the code
Basically an initialization process is done updating
Later, we will test the modification. In the management interface at the back, we will turn off the rate_limiter state. After the modification, we will enter the debug immediately, and the received data is as follows:
{"groupType":"PLUGIN"."eventType":"UPDATE"."data": [{"id":"4"."name":"rate_limiter"."config":"{\" mode \ ": \" standalone \ ", \ "the master \" : \ "mymaster \", \ "url \" : \ "127.0.0.1:6379 \"}"."role":1."enabled":false}}]Copy the code
You can see the key data: groupType, eventType, data, and so on
After the deletion of the test, the process is basically the same, the type has changed
Behind other types of tests: the PLUGIN/RULE/SELECTOR/META_DATA, APP_AUTH not too to make sure, there is no measurement
Generally speaking, it is relatively simple and clear, so let’s summarize a wave directly
conclusion
In general, Websocket synchronization is relatively simple, but it can be confusing to use Websocket. Because I still use Websocket in my work, it is easier to understand the reading data. If you are confused, you can search the tutorial of Spring WebSocket and play with it yourself
Summary of today’s Websocket synchronization:
As shown in the figure above, the data update process is as follows:
- 1.SoulWebsocketClient: Receives data from soul-admin and updates it with data type, operation type, and specific information
- 2.WebsocketDataHandler: According to this ConfigGroupEnum (data type), call the corresponding SUBSCRIBE
- 3. Subscribe: Specifically subscribe calls the corresponding type of DataCache to update and delete data
Soul Gateway source code analysis article list
Github
-
Soul source reading (a) overview
-
Soul source code reading (two) the initial run of the code
-
HTTP request processing overview
-
Dubbo Request Overview
-
Soul Gateway source code reading (five) request type exploration
-
Soul Gateway source code reading (6) Sofa request processing overview
-
Soul gateway source code reading (seven) a stream limit plug-in exploration
-
Soul gateway source code reading (eight) route matching exploration
-
Soul Gateway source code reading (nine) plug-in configuration load preliminary discussion
-
Soul Gateway source code reading (ten) custom simple plug-in written
-
Soul Gateway source code read (11) request processing summary
-
Soul Gateway source code reading (12) data synchronization preliminary -Bootstrap end
-
Soul Gateway source code reading (thirteen) Websocket synchronization data -Bootstrap end
-
Soul Gateway source code reading (14) HTTP data synchronization -Bootstrap end
-
HTTP parameter request error
The Denver nuggets
-
Soul Gateway source code read (a) overview
-
Soul Gateway source code reading (two) the initial operation of the code
-
Soul Gateway source code reading (3) Request processing overview
-
Dubbo Request Overview
-
Soul Gateway source code reading (five) request type exploration
-
Soul Gateway source code reading (6) Sofa request processing overview
-
Soul gateway source code reading (seven) a stream limit plug-in exploration
-
Soul gateway source code reading (eight) route matching exploration
-
Soul Gateway source code reading (nine) plug-in configuration load preliminary discussion
-
Soul Gateway source code reading (ten) custom simple plug-in written
-
Soul Gateway source code read (11) request processing summary
-
Soul Gateway source code reading (12) data synchronization
-
Soul Gateway source code reading (thirteen) Websocket synchronization data -Bootstrap end
-
Soul Gateway source code reading (14) HTTP data synchronization -Bootstrap end
-
HTTP parameter request error