Introduction to the

This article explores the Soul gateway Admin’s Websocket data synchronization process

An overview of

Start the sample with Websocket synchronization

According to the previous Zookeeper and Nacos data synchronization analysis experience, find the Websocket event listener processing class, on its breakpoint, debug to view the initialization process

Then modify the status of the plug-in in the Admin background to debug and view the data change processing process

Find that initialization starts from the familiar syncAll, and event change processing starts from Controllers, which is documented in the source Debug section

The sample run

Start database:

docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Copy the code

Run soul-admin and set the data synchronization mode to Websocket

soul:
  sync:
    websocket:
      enabled: true
Copy the code

Soul-bootstrap, websocket synchronization, and soul-bootstrap

Soul: # open the websocket data synchronization sync: websocket: urls: ws: / / localhost: 9095 / websocketCopy the code

Run soul-example-http to register some data for Debug testing

Source code for the Debug

Initialization Process

First of all, according to the previous experience in the Soul – the Admin module, the listener. The websocket directory under the package to find the corresponding processing class: websocket event listeners WebsocketDataChangedListener

We find the plugin change handling function, port it, and restart Admin

After successfully entering the breakpoint, we can see that the following function basically encapsulates the data in Websocket format, and then sends the data in Websocket format

public class WebsocketDataChangedListener implements DataChangedListener {

    @Override
    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
        WebsocketData<PluginData> websocketData =
                newWebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }}Copy the code

PS: There is a small detail to this. If no Bootstrap or event is sent, the breakpoint will not enter

We follow the call stack to the familiar event handling distribution from the previous article, and continue here

public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {

    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                / / the Plugin
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: "+ event.getGroupKey()); }}}}Copy the code

Come to the familiar full data synchronization function: read all the data from the database, then publish the event to synchronize

public class SyncDataServiceImpl implements SyncDataService {

    @Override
    public boolean syncAll(final DataEventTypeEnum type) {
        appAuthService.syncData();
        List<PluginData> pluginDataList = pluginService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
        List<SelectorData> selectorDataList = selectorService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
        List<RuleData> ruleDataList = ruleService.listAll();
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
        metaDataService.syncData();
        return true; }}Copy the code

If you have written Websocket, you must be very familiar with the following function, which is to receive the message, and then call the processing logic

During debugging, we see that the value of message is “MYSELF” and assume that the initial communication convention of the Websocket is to receive “MYSELF” and synchronize the full data to the Bootstrap

public class WebsocketCollector {

    @OnMessage
    public void onMessage(final String message, final Session session) {
        // message == MYSELF
        if (message.equals(DataEventTypeEnum.MYSELF.name())) {
            try {
                // Save the client Session information
                ThreadLocalUtil.put(SESSION_KEY, session);
                SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
            } finally{ ThreadLocalUtil.clear(); }}}}Copy the code

Going back to the Websocket send function, there are two things to note:

1. When “MYSELF” is used, the message is only sent to a specific client

In the above function, the session is saved using ThreadLocal, then retrieved, and then sent using session

This should be for the newly connected Bootstrap and send the full amount of data to it

2. If it’s not MYSELF, send a message to all clients

This should be the event change and then synchronize the data to all clients

public class WebsocketCollector {

    public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isNotBlank(message)) {
            if (DataEventTypeEnum.MYSELF == type) {
                Session session = (Session) ThreadLocalUtil.get(SESSION_KEY);
                if(session ! =null) { sendMessageBySession(session, message); }}else{ SESSION_SET.forEach(session -> sendMessageBySession(session, message)); }}}}Copy the code

Here we see ThreadLocal for identity retrieval, but the SESSION_KEY is the same. It’s a bit of a muddle, so we’ll look at it later

In the past, we’ve done this client-specific processing directly in the OnMessage; Or the client has an ID when it connects

I feel like I’ve learned something from this

Data changes

Data synchronization finished, we in the Admin background management interface, modify the status of the current limiting plug-in, and then trigger into the function we hit the breakpoint at the beginning:

public class WebsocketDataChangedListener implements DataChangedListener {

    @Override
    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
        WebsocketData<PluginData> websocketData =
                newWebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }}Copy the code

Tracing the call stack events, event distribution skips and comes back to the familiar PluginServiceImpl, where plug-in data is updated and events are published

public class PluginServiceImpl implements PluginService {

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String createOrUpdate(final PluginDTO pluginDTO) {
        final String msg = checkData(pluginDTO);
        if (StringUtils.isNoneBlank(msg)) {
            return msg;
        }
        PluginDO pluginDO = PluginDO.buildPluginDO(pluginDTO);
        DataEventTypeEnum eventType = DataEventTypeEnum.CREATE;
        if (StringUtils.isBlank(pluginDTO.getId())) {
            pluginMapper.insertSelective(pluginDO);
        } else {
            eventType = DataEventTypeEnum.UPDATE;
            pluginMapper.updateSelective(pluginDO);
        }

        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, eventType,
                Collections.singletonList(PluginTransfer.INSTANCE.mapToData(pluginDO))));
        returnStringUtils.EMPTY; }}Copy the code

Continue tracing to the Controllers interface of the property from where event updates are triggered

@RestController
@RequestMapping("/plugin")
public class PluginController {

    @PutMapping("/{id}")
    public SoulAdminResult updatePlugin(@PathVariable("id") final String id, @RequestBody final PluginDTO pluginDTO) {
        Objects.requireNonNull(pluginDTO);
        pluginDTO.setId(id);
        final String result = pluginService.createOrUpdate(pluginDTO);
        if (StringUtils.isNoneBlank(result)) {
            return SoulAdminResult.error(result);
        }
        returnSoulAdminResult.success(SoulResultMessage.UPDATE_SUCCESS); }}Copy the code

conclusion

This article has carried on the preliminary exploration of the processing flow of Websocket data synchronization on Admin side, which can be roughly divided into initialization and data update (including deletion) processing flow

  • Data initialization: Boostrap sends the MYSELF message to trigger all data synchronization

  • Data processing (listening class ZookeeperDataChangedListener)

    • HTTP interface call: can be the management background; It can also be a service registration Client
    • Service invocation: Updates the data in the database and invokes the publish event interface
    • Publish events: Publish events to data synchronization listeners
    • Data update: After receiving the event, update (Websocket push, Zookeeper write, HTTP update MD5, Nacos write)

The three chapters: Zookeeper, Nacos and Websocket found that the processing process is basically similar. The initialization entry, event trigger and distribution are all the same in the previous chapters, but the specific sending logic is different

You can see that the structure of the code is very clear