Introduction to the

In this article, we explore the details of Websocket data synchronization

The sample run

Start database:

docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Copy the code

Run soul-admin first

Soul-bootstrap, websocket synchronization, and soul-bootstrap

Corss: enabled: true Dubbo: parameter: multi # Sync: webSocket: urls: ws://localhost:9095/websocketCopy the code

Run soul-example-http to register some data for Debug testing

Source code for the Debug

In the previous analysis, we know the core class of data update and its core code as follows:

public class CommonPluginDataSubscriber implements PluginDataSubscriber {...// We can see that there is a slight flaw in the naming of the function; OnSubscribe is not a very good function, it's a little bit too general
    @Override
    public void onSubscribe(final PluginData pluginData) {
        subscribeDataHandler(pluginData, DataEventTypeEnum.UPDATE);
    }
    
    @Override
    public void unSubscribe(final PluginData pluginData) {
        subscribeDataHandler(pluginData, DataEventTypeEnum.DELETE);
    }

    public void onSelectorSubscribe(final SelectorData selectorData) {
        subscribeDataHandler(selectorData, DataEventTypeEnum.UPDATE);
    }
    
    @Override
    public void unSelectorSubscribe(final SelectorData selectorData) {
        subscribeDataHandler(selectorData, DataEventTypeEnum.DELETE);
    }

    @Override
    public void onRuleSubscribe(final RuleData ruleData) {
        subscribeDataHandler(ruleData, DataEventTypeEnum.UPDATE);
    }
    
    @Override
    public void unRuleSubscribe(final RuleData ruleData) {
        subscribeDataHandler(ruleData, DataEventTypeEnum.DELETE);
    }
    
    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
        Optional.ofNullable(classData).ifPresent(data -> {
            if (data instanceof PluginData) {
                PluginData pluginData = (PluginData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    // Update the plug-in configuration data
                    BaseDataCache.getInstance().cachePluginData(pluginData);
                    Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    // Delete plug-in configuration dataBaseDataCache.getInstance().removePluginData(pluginData); Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData)); }}else if (data instanceof SelectorData) {
                SelectorData selectorData = (SelectorData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    // Update the selector data
                    BaseDataCache.getInstance().cacheSelectData(selectorData);
                    Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    // Delete the selector dataBaseDataCache.getInstance().removeSelectData(selectorData); Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData)); }}else if (data instanceof RuleData) {
                RuleData ruleData = (RuleData) data;
                if (dataType == DataEventTypeEnum.UPDATE) {
                    // Update the rule data
                    BaseDataCache.getInstance().cacheRuleData(ruleData);
                    Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.handlerRule(ruleData));
                } else if (dataType == DataEventTypeEnum.DELETE) {
                    // Delete rule dataBaseDataCache.getInstance().removeRuleData(ruleData); Optional.ofNullable(handlerMap.get(ruleData.getPluginName())).ifPresent(handler -> handler.removeRule(ruleData)); }}}); }}Copy the code

Let’s take a look at the plugin configuration data first, and put a breakpoint on onSubscribe and unSubscribe to restart soul-bootstrap

public class PluginDataHandler extends AbstractDataHandler<PluginData> {

    // Restore, executed on startup
    @Override
    protected void doRefresh(final List<PluginData> dataList) {
        // Clear the cached data
        pluginDataSubscriber.refreshPluginDataSelf(dataList);
        dataList.forEach(pluginDataSubscriber::onSubscribe);
    }

    // Update data (increment)
    @Override
    protected void doUpdate(final List<PluginData> dataList) {
        dataList.forEach(pluginDataSubscriber::onSubscribe);
    }

    // Delete data
    @Override
    protected void doDelete(final List<PluginData> dataList) { dataList.forEach(pluginDataSubscriber::unSubscribe); }}Copy the code

This class is the only call CommonPluginDataSubscriber related websocket synchronization module interface, you can see it’s three interfaces: restore, update, delete

Datalist we look at the values, we can find all the data related to our plugin, very important, we continue to track its source

Looking at the call stack, we go to the function below and see that it calls the three interfaces of the previous function

public abstract class AbstractDataHandler<T> implements DataHandler {

    // Corresponding to the above three operations: restore, update, delete
    @Override
    public void handle(final String json, final String eventType) {
        List<T> dataList = convert(json);
        if (CollectionUtils.isNotEmpty(dataList)) {
            DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
            switch (eventTypeEnum) {
                case REFRESH:
                case MYSELF:
                    doRefresh(dataList);
                    break;
                case UPDATE:
                case CREATE:
                    doUpdate(dataList);
                    break;
                case DELETE:
                    doDelete(dataList);
                    break;
                default:
                    break; }}}}Copy the code

We see the DataEventTypeEnum: DELETE/CREATE/UPDATE/REFRESH MYSELF, basic can correspond to the up and down the SWITH case

And seeing the json converted dataList, we continue to follow

public class WebsocketDataHandler {

    public void executor(final ConfigGroupEnum type, final String json, final String eventType) { ENUM_MAP.get(type).handle(json, eventType); }}Copy the code

Type, the function and evenTtype little attention, take a look at the ConfigGroupEnum: APP_AUTH/plugins/RULE/SELECTOR/META_DATA, find some front we ignore the synchronization of data, In addition to plug-in configuration data, selector data, rule data, there are also APP_AUTH data and META_DATA (feel and RPC related), we make up the test later

Continue with the Websocket property, and the previous analysis of the article is basically the same, through a Websocket client, receive a message triggered call

@Slf4j
public final class SoulWebsocketClient extends WebSocketClient {
    
    @Override
    public void onMessage(final String result) {
        handleResult(result);
    }
    
    @SuppressWarnings("ALL")
    private void handleResult(final String result) { WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class); ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType()); String eventType = websocketData.getEventType(); String json = GsonUtils.getInstance().toJson(websocketData.getData()); websocketDataHandler.executor(groupEnum, json, eventType); }}Copy the code

Basically an initialization process is done updating

Later, we will test the modification. In the management interface at the back, we will turn off the rate_limiter state. After the modification, we will enter the debug immediately, and the received data is as follows:

{"groupType":"PLUGIN"."eventType":"UPDATE"."data": [{"id":"4"."name":"rate_limiter"."config":"{\" mode \ ": \" standalone \ ", \ "the master \" : \ "mymaster \", \ "url \" : \ "127.0.0.1:6379 \"}"."role":1."enabled":false}}]Copy the code

You can see the key data: groupType, eventType, data, and so on

After the deletion of the test, the process is basically the same, the type has changed

Behind other types of tests: the PLUGIN/RULE/SELECTOR/META_DATA, APP_AUTH not too to make sure, there is no measurement

Generally speaking, it is relatively simple and clear, so let’s summarize a wave directly

conclusion

In general, Websocket synchronization is relatively simple, but it can be confusing to use Websocket. Because I still use Websocket in my work, it is easier to understand the reading data. If you are confused, you can search the tutorial of Spring WebSocket and play with it yourself

Summary of today’s Websocket synchronization:

As shown in the figure above, the data update process is as follows:

  • 1.SoulWebsocketClient: Receives data from soul-admin and updates it with data type, operation type, and specific information
  • 2.WebsocketDataHandler: According to this ConfigGroupEnum (data type), call the corresponding SUBSCRIBE
  • 3. Subscribe: Specifically subscribe calls the corresponding type of DataCache to update and delete data

Soul Gateway source code analysis article list

Github

  • Soul source reading (a) overview

  • Soul source code reading (two) the initial run of the code

  • HTTP request processing overview

  • Dubbo Request Overview

  • Soul Gateway source code reading (five) request type exploration

  • Soul Gateway source code reading (6) Sofa request processing overview

  • Soul gateway source code reading (seven) a stream limit plug-in exploration

  • Soul gateway source code reading (eight) route matching exploration

  • Soul Gateway source code reading (nine) plug-in configuration load preliminary discussion

  • Soul Gateway source code reading (ten) custom simple plug-in written

  • Soul Gateway source code read (11) request processing summary

  • Soul Gateway source code reading (12) data synchronization preliminary -Bootstrap end

  • Soul Gateway source code reading (thirteen) Websocket synchronization data -Bootstrap end

  • Soul Gateway source code reading (14) HTTP data synchronization -Bootstrap end

  • HTTP parameter request error

The Denver nuggets

  • Soul Gateway source code read (a) overview

  • Soul Gateway source code reading (two) the initial operation of the code

  • Soul Gateway source code reading (3) Request processing overview

  • Dubbo Request Overview

  • Soul Gateway source code reading (five) request type exploration

  • Soul Gateway source code reading (6) Sofa request processing overview

  • Soul gateway source code reading (seven) a stream limit plug-in exploration

  • Soul gateway source code reading (eight) route matching exploration

  • Soul Gateway source code reading (nine) plug-in configuration load preliminary discussion

  • Soul Gateway source code reading (ten) custom simple plug-in written

  • Soul Gateway source code read (11) request processing summary

  • Soul Gateway source code reading (12) data synchronization

  • Soul Gateway source code reading (thirteen) Websocket synchronization data -Bootstrap end

  • Soul Gateway source code reading (14) HTTP data synchronization -Bootstrap end

  • HTTP parameter request error