Soul Gateway learning Http long polling parsing 01

Author: Zhu Ming

Data Synchronization between the Background and gateway (Http Long Polling)

configuration

Background information mode switch

The DataSyncConfiguration class is used to switch the Zookeeper synchronization class. If you have the DataSyncConfiguration class, you can configure the Zookeeper synchronization class

soul:
  sync:
    websocket:
      enabled: false
    http:
      enabled: true
Copy the code

Switch the gateway information mode

After the background mode switch is complete, the gateway is next. Continue to follow the example to find the parameter Settings on the key configuration classes. The gateway configuration is also directly attached here

soul:
  sync:
# websocket:
# urls: ws://localhost:9095/websocket
  http:
  	url: http://localhost:9095
Copy the code

DataChangedListener system

Background data initialization DataSyncConfiguration configuring key beans, see the beans for Http long polling here

@Configuration
public class DataSyncConfiguration {
  
  @Configuration
  @ConditionalOnProperty(name = "soul.sync.http.enabled", havingValue = "true")
  @EnableConfigurationProperties(HttpSyncProperties.class)
  static class HttpLongPollingListener {

    @Bean
    @ConditionalOnMissingBean(HttpLongPollingDataChangedListener.class)
    public HttpLongPollingDataChangedListener httpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
      return newHttpLongPollingDataChangedListener(httpSyncProperties); }}}Copy the code

HttpLongPollingDataChangedListener inherited from AbstractDataChangedListener, DataChangedListener they implement the interface.

DataChangedListener we should be very familiar with this interface, it provides many different data types, the method of change for DataChangedEventDispatcher calls, this class is a “friend”, as a transit point, Diligently handle data synchronization event classification and distribution

public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
  // Hold the DataChangedListener collection
  private List<DataChangedListener> listeners;
  
  // A method to notify DataChangedListener of different event types when an event changes
  public void onApplicationEvent(final DataChangedEvent event) {
    for (DataChangedListener listener : listeners) {
      switch (event.getGroupKey()) {
        case APP_AUTH:
          listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
          break;
        case PLUGIN:
          listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
          break;
        case RULE:
          listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
          break;
        case SELECTOR:
          listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
          break;
        case META_DATA:
          listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
          break;
        default:
          throw new IllegalStateException("Unexpected value: "+ event.getGroupKey()); }}}}Copy the code
public interface DataChangedListener {
	
  default void onAppAuthChanged(List<AppAuthData> changed, DataEventTypeEnum eventType) {}

  default void onPluginChanged(List<PluginData> changed, DataEventTypeEnum eventType) {}

  default void onSelectorChanged(List<SelectorData> changed, DataEventTypeEnum eventType) {}

  default void onMetaDataChanged(List<MetaData> changed, DataEventTypeEnum eventType) {}

  default void onRuleChanged(List<RuleData> changed, DataEventTypeEnum eventType) {}}Copy the code

Understand the role of the two, and did what the AbstractDataChangedListener? Take an example of onPluginChanged() :

public abstract class AbstractDataChangedListener implements DataChangedListener.InitializingBean {
  
  protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();

	@Override
  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    if (CollectionUtils.isEmpty(changed)) {
      return;
    }
    this.updatePluginCache();
    this.afterPluginChanged(changed, eventType);
  }
  
  // Modify the cache (rewritable)
  protected void updatePluginCache(a) {
    this.updateCache(ConfigGroupEnum.PLUGIN, pluginService.listAll());
  }
  
  protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
    String json = GsonUtils.getInstance().toJson(data);
    ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
    ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
    log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
  }
  
  // Hook, customize what to do after data changes (rewritable)
  protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {}}Copy the code

For a plug-in data changes (onPluginChanged), actually AbstractDataChangedListener is defined a template, let subclasses can be carried out in accordance with the specified steps work, specific work details on each of these steps can be realized by subclasses.

Second, if its CACHE update is not overridden, it is maintained in the CACHE by this class.

What are the other synchronization policies doing at this point?

In DataChangedEventDispatcher obtaining onPluginChanged (), long polling module will be how to implement? Let’s think about what the other synchronization methods are doing at this point

For example, the WebSocket pattern overrides onPluginChanged() itself to send the WebSocket message to the holding session, which contains the gateway.

public class WebsocketDataChangedListener implements DataChangedListener {
  
	@Override
  public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
    WebsocketData<PluginData> websocketData =
      newWebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList); WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType); }}Copy the code

Looking at the ZooKeeper mode, it also overrides onPluginChanged() to modify the node information on ZooKeeper so that the gateway listens for their node changes.

public class ZookeeperDataChangedListener implements DataChangedListener {
  
	@Override
  public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    for (PluginData data : changed) {
      String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
      // delete
      if (eventType == DataEventTypeEnum.DELETE) {
        deleteZkPathRecursive(pluginPath);
        String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
        deleteZkPathRecursive(selectorParentPath);
        String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
        deleteZkPathRecursive(ruleParentPath);
        continue;
      }
      //create or updateinsertZkNode(pluginPath, data); }}}Copy the code

As you can see, by this point, other synchronization strategies are already busy notifying the gateway, so Http long polling must do the same.

The notification methods of the two policies are also different. Websocket is a good person who can find the session and send the information to the end. Zookeeper does not care about the change of node information, and the gateway listens to the change and then synchronizes the information.

So how does our Http long polling now inform the gateway? Then look at.

Long polling implementation thinking

Let’s think about how long polling will work if I design it myself.

A normal long polling implementation should be initiated by the gateway, and the background should take the request and hold it, returning if there is an update, and blocking for a certain amount of time if there is no update. And the background is to do the data update, hold the data check whether there are changes.

So there are three points:

  1. How does the data know if it has changed? Does it set the last update time and compare it with the request time of the gateway to see if the data has changed?
  2. After holding, how does the background know if the data is updated, iterated or blocked?
  3. Where to put the data that is used for updates, and with caching, consider how the background cache interacts with the database.

HttpLongPollingDataChangedListener long polling

Around our thinking, look at how HttpLongPollingDataChangedListener. Take a look at the implementation of the parent onPluginChanged()

public class HttpLongPollingDataChangedListener extends AbstractDataChangedListener {
  
  private final ScheduledExecutorService scheduler;
  
  @Override
  protected void afterPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
    scheduler.execute(newDataChangeTask(ConfigGroupEnum.PLUGIN)); }}Copy the code

Http long polling does not overwrite onPluginChanged() directly, but uses its parent class directly, which means it is using its CACHE, so ultimately we will need to parse this for information retrieval.

The next logic calls our implementation’s afterPluginChanged() method, which uses a timed thread pool to run a Runnable task DataChangeTask.

class DataChangeTask implements Runnable {
  
  @Override
  public void run(a) {
    / / traverse the clients
    for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
      LongPollingClient client = iter.next();
      iter.remove();
      // The response is complete
      client.sendResponse(Collections.singletonList(groupKey));
      log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime); }}}Copy the code

After the data changes, call this method using the thread pool, fetch all clients, cull elements while iterating, and call the method sendResponse() as if to mark the response completed.

Let me guess what it does. Clients is probably the gateway holding the request, and sendResponse() is probably actually adding the response to the request context. Another key action is to end the hold, let the gateway receive the response, and exclude the request from the collection.

We are tracking down the client, it is one of HttpLongPollingDataChangedListener BlockingQueue blocking queue, in LongPollingClient periodically

class LongPollingClient implements Runnable {
  
  @Override
  public void run(a) {
    this.asyncTimeoutFuture = scheduler.schedule(() -> {
      clients.remove(LongPollingClient.this);
      List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
      sendResponse(changedGroups);
    }, timeoutTime, TimeUnit.MILLISECONDS);
    // Here is the key, indicating the source
    clients.add(this); }}Copy the code

Instead of examining the remove() block, look at the add() in the last sentence, which is where the Clients data comes from.

Find LongPollingClient is invoked, HttpLongPollingDataChangedListener# doLongPolling

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {

  // ...

  // listen for configuration changed.
  // Enable synchronous blocking requests
  final AsyncContext asyncContext = request.startAsync();

  // AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
  asyncContext.setTimeout(0L);

  // block client's thread.
  // The thread pool calls LongPollingClient#run
  scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
Copy the code

The last sentence here calls and adds the client, where a critical line of code blocks the request:

final AsyncContext asyncContext = request.startAsync();
Copy the code

In LongPollingClient#sendResponse, we just analyzed that in addition to wrapping the injected response information, the held request is released

class LongPollingClient implements Runnable {
  
	void sendResponse(final List<ConfigGroupEnum> changedGroups) {
    // cancel scheduler
    if (null! = asyncTimeoutFuture) { asyncTimeoutFuture.cancel(false);
    }
    generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
    // The synchronization is completeasyncContext.complete(); }}Copy the code

This analysis is then returned to doLongPolling(), where the thread pool calls this one more key point

scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
Copy the code

LongPollingClient (60 seconds); LongPollingClient (60 seconds); Remember the piece of code we skipped in LongPollingClient#run

class LongPollingClient implements Runnable {
  
  @Override
  public void run(a) {
    // Timed startup with a delay based on timeoutTime
    this.asyncTimeoutFuture = scheduler.schedule(() -> {
      // Remove the managed connection
      clients.remove(LongPollingClient.this);
      List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
      // This method will release the blocked request
      sendResponse(changedGroups);
    }, timeoutTime, TimeUnit.MILLISECONDS);

    clients.add(this); }}Copy the code

Now that we know how to implement the long polling process in the background, let’s look at how doLongPolling() is called

@ConditionalOnBean(HttpLongPollingDataChangedListener.class)
@RestController
@RequestMapping("/configs")
@Slf4j
public class ConfigController {
  
  @PostMapping(value = "/listener")
  public void listener(final HttpServletRequest request, final HttpServletResponse response) { longPollingListener.doLongPolling(request, response); }}Copy the code

As you can see this is pretty straightforward, the background exposes HTTP paths through this Controller for the gateway to call and listens for data changes.

conclusion

  • The background exposes THE API to the gateway through the Controller layer. When the gateway requests the background, the background does not immediately return the response (whether the data is changed or not), but holds the request for the maximum time of 60 seconds. These held requests are added to the blocking queue as a memory cache.
  • The 60 seconds if there are data changes, through DataChangedEventDispatcher distributed to our HttpLongPollingDataChangedListener, The thread pool is immediately called to iterate over all held requests in the blocking queue, filling in the response information and releasing it.
  • If there is no change after 60 seconds, the held request is released and the corresponding request object blocking the queue is discarded.

So far, we have clarified its very basic long polling logic, so corresponding to the next start to think, see what conclusions or doubts.

  1. How does the data know if it has changed? Does it set the last update time and compare it with the request time of the gateway to see if the data has changed?
  2. After holding, how does the background know if the data is updated, iterated or blocked?
  3. Where to put the data that is used for updates, and with caching, consider how the background cache interacts with the database.

For point 1, how do we know that the data has changed?

  • Now we analyze changes in the data source is DataChangedEventDispatcher, it will not only tell us information when data changes, each time manually point immediately to call here under background synchronization.

    There must be some sort of comparison between old and new data, or each call will simply release the blocking request from the gateway, which is not a good design.

For point 2, we now know that the pattern is blocking wait, using AsyncContext, which I didn’t know about either, but I’ll talk a little bit more about.

For the third point, we know that the background configuration must be changed to the database, so the interaction between the cache and the database is also a point worth analyzing. These questions will be discussed in the next chapter