Soul Gateway learning Http long polling parsing 02

Author: zhu Ming

Data Synchronization between the Background and gateway (Http Long Polling)

The last chapter of long polling analysis summarizes the implementation of long polling on the gateway side and the way of data flow.

The process of long polling on the gateway side is also divided into two modules: one is pull at startup, the other is polling to monitor changes

The gateway pulls data during startup

After the gateway is started, the interface provided by the background will be called to pull the data, and the data will be sent to the data processing classes of each plug-in

The following shows the processing flow of pull data initiated by the lower gateway:

These processing steps are dispersed into the method collaboration of the following classes:

HttpSyncDataService#start: When the gateway is started, HttpSyncDataService initializes a call to the start() method, which calls the background pull data and starts multiple threads for polling.

public class HttpSyncDataService implements SyncDataService.AutoCloseable {
  
  private void start(a) {
    // Prevent the CAS operation from being invoked twice
    if (RUNNING.compareAndSet(false.true)) {
      // Here is the focus of this process, call pull data method
      this.fetchGroupConfig(ConfigGroupEnum.values());
      int threadSize = serverList.size();
      // Thread polling listening will be enabled according to the background cluster
      this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                                             new LinkedBlockingQueue<>(),
                                             SoulThreadFactory.create("http-long-polling".true));
      this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
    } else {
      log.info("soul http long polling was started, executor=[{}]", executor); }}}Copy the code

HttpSyncDataService#fetchGroupConfig: The function is simply to call the pull data method repeatedly according to the data type (for the same background will be requested multiple times, each pull data type information), here refers to the data type refers to plugin, rule, selector, etc

private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
  for (int index = 0; index < this.serverList.size(); index++) {
    String server = serverList.get(index);
    try {
			// Call the pull data method multiple times based on the data type enumeration passed in
      this.doFetchGroupConfig(server, groups);
      break;
    } catch (SoulException e) {
      if (index >= serverList.size() - 1) {
        throw e;
      }
      log.warn("fetch config fail, try another one: {}", serverList.get(index + 1)); }}}Copy the code

HttpSyncDataService#doFetchGroupConfig: Request /configs/fetch to fetch a certain type of data and update the cache. Before updating the cache, it will detect whether there is any change, and if there is no change, it will sleep 30s. (Since it is the first time to start, the cache will be updated if the data is empty, so it will directly end.)

private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
  StringBuilder params = new StringBuilder();
  for (ConfigGroupEnum groupKey : groups) {
    params.append("groupKeys").append("=").append(groupKey.name()).append("&");
  }
  // Specify the request path, pull background data
  String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
  log.info("request configs: [{}]", url);
  String json = null;
  try {
    json = this.httpClient.getForObject(url, String.class);
  } catch (RestClientException e) {
    String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
    log.warn(message);
    throw new SoulException(message, e);
  }
  // Modify the cache information
  boolean updated = this.updateCacheWithJson(json);
  // Determine whether to modify, modify the end
  if (updated) {
    log.info("get latest configs: [{}]", json);
    return;
  }
  log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
  ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
Copy the code

HttpSyncDataService#updateCacheWithJson: fetch the data in the response, the changed data, and pass it to the data refresh factory DataRefreshFactory

private DataRefreshFactory factory;

public HttpSyncDataService(...).{
  this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
}

private boolean updateCacheWithJson(final String json) {
  JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
  JsonObject data = jsonObject.getAsJsonObject("data");
  return factory.executor(data);
}
Copy the code

Datarefreshfactor # Executor: Sends data to various data refresh classes.

public final class DataRefreshFactory {

  private static final EnumMap<ConfigGroupEnum, DataRefresh> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);

  public DataRefreshFactory(final PluginDataSubscriber pluginDataSubscriber,
                              final List<MetaDataSubscriber> metaDataSubscribers,
                              final List<AuthDataSubscriber> authDataSubscribers) {
    // Inject each type of subscriber into the MAP
    ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataRefresh(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataRefresh(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataRefresh(pluginDataSubscriber));
    ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AppAuthDataRefresh(authDataSubscribers));
    ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataRefresh(metaDataSubscribers));
  }
  
  public boolean executor(final JsonObject data) {
    final boolean[] success = {false};
    // Tureen: Call all DataRefresh for all data types
    ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
    return success[0]; }}Copy the code

AbstractDataRefresh#refresh: determines whether to update the cache and calls each type of refresh() method if so

@Override
public Boolean refresh(final JsonObject data) {
  boolean updated = false;
  JsonObject jsonObject = convert(data);
  if (null! = jsonObject) { ConfigData<T> result = fromJson(jsonObject);if (this.updateCacheIfNeed(result)) {
      updated = true;
      // Turren: Call refreshrefresh(result.getData()); }}return updated;
}
Copy the code

PluginDataRefresh#refresh: calls the subscriber of the plugin, which then notifies all extension plug-ins of any changes related to events

@Override
protected void refresh(final List<PluginData> data) {
  if (CollectionUtils.isEmpty(data)) {
    log.info("clear all plugin data cache");
    pluginDataSubscriber.refreshPluginDataAll();
  } else {
    pluginDataSubscriber.refreshPluginDataAll();
    // Turren: HTTP synchronization, calling the plug-in data subscriberdata.forEach(pluginDataSubscriber::onSubscribe); }}Copy the code

Gateway polling listens for changes

When the gateway is started, the thread is also started to do background listening request. The listening request is polling in a while loop, which will hijack the request in the background. This is analyzed in the background summary (data synchronization between background and gateway (Http long polling)).

The overall process of monitoring data changes of the gateway is shown below:

The corresponding actual code implementation is as follows:

The process implementation for listening on the gateway side is all in the HttpSyncDataService class, which is passed at the enddoFetchGroupConfig()To the various subscribers, the following process is the same as at startup

HttpSyncDataService#start: Starts the thread to execute the Runnable HttpLongPollingTask

HttpLongPollingTask#run: enable the circular call polling method.

@Override
public void run(a) {
  while (RUNNING.get()) {
    for (int time = 1; time <= retryTimes; time++) {
      try {
        doLongPolling(server);
      } catch (Exception e) {
        if (time < retryTimes) {
          log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                   time, retryTimes - time, e.getMessage());
          ThreadUtils.sleep(TimeUnit.SECONDS, 5);
          continue;
        }
        log.error("Long polling failed, try again after 5 minutes!", e);
        ThreadUtils.sleep(TimeUnit.MINUTES, 5); }}}}Copy the code

HttpLongPollingTask#doLongPolling: Gets the result of the response to the listening request, and calls the data pull method if there is a change in the returned value.

private void doLongPolling(final String server) {
  // Retrieve data from the cache
  MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
  for(ConfigGroupEnum group : ConfigGroupEnum.values()) { ConfigData<? > cacheConfig = factory.cacheConfigData(group); String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
    params.put(group.name(), Lists.newArrayList(value));
  }
  // Build the HTTP request information
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
  HttpEntity httpEntity = new HttpEntity(params, headers);
  String listenerUrl = server + "/configs/listener";
  log.debug("request listener configs: [{}]", listenerUrl);
  JsonArray groupJson = null;
  try {
    String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
    groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
  } catch (RestClientException e) {
    String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
    throw new SoulException(message, e);
  }
  // Get the type of change
  if(groupJson ! =null) {
    ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
    if (ArrayUtils.isNotEmpty(changedGroups)) {
      log.info("Group config changed: {}", Arrays.toString(changedGroups));
      // Pull the corresponding type of data in the background
      this.doFetchGroupConfig(server, changedGroups); }}}Copy the code

LongPollingClient#doFetchGroupConfig:

This code was analyzed in the previous startup, and the most different point from the startup is that if the pulled data is compared with the cache and no change is found, the sleep will be 30 seconds, which will cause the next listening to be delayed 30 seconds.

What does that mean? If the gateway goes to fetch the data in the background, it finds that it is cheated after comparison! There is no change, I will wait 30 seconds to start the next listening, during which if the background data changes, the gateway will certainly not be notified.

Why does the gateway do this? Nature is to prevent a large number of cycles of useless pull, if there is a problem in the background constantly notify data changes, but there is no actual change, then the gateway without delay will produce a large number of useless network IO and data exchange with the background