First, preparation
-
1. Open the HTTP data synchronization configuration in the soul-admin (console) and soul-bootstrap (Gateway Service) configurations
-
Start the console and gateway services
2. Data synchronization through HTTP long rotation training
1. Start with console logs
Learning about a mature project can start with its official documentation, unit tests, or logs. It is speculated that the client has a scheduled task to continuously train the server data, that is, the gateway has been training the console data, constantly updating the local cache.
When the gateway service is started, you can see a line of the gateway log
...HttpSyncDataService: request configs: [http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA]
Copy the code
By requesting http://localhost:9095/configs/fetch? GroupKeys =APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA After half a day, the gateway service log has not been updated, this is not scientific, there should be a rotation request, indicating that the gateway service log print is not complete.
Take a look at the console logs again
Not ugly, update all kinds of data every 5 minutes, let’s start from class HttpLongPollingDataChangedListener appeared in the log
See first be afterPropertiesSet method invocation HttpLongPollingDataChangedListener# afterInitialize method, also is the source of the print log method.
The InitializingBean interface provides post-property initialization methods for beans, including only afterPropertiesSet methods. Classes that inherit this interface execute this method after the bean’s properties are initialized
@Override
protected void afterInitialize(a) {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache(a) {
this.updateAppAuthCache();
this.updatePluginCache();
this.updateRuleCache();
this.updateSelectorCache();
this.updateMetaDataCache();
}
Copy the code
Can be seen in the initialization HttpLongPollingDataChangedListener bean, creates a timer thread of execution, check and write the local cache, and then? This is only written to the CONSOLE’s JVM memory and not sent to the gateway service.
2. The console receives HTTP requests
So let’s look at the methods that get the cache Map data, and the Controller interface /configs/listener. We see some execution method of Server layer HttpLongPollingDataChangedListener# doLongPolling there are a lot of knowledge.
public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
// compare group md5
List<ConfigGroupEnum> changedGroup = compareChangedGroup(request);
String clientIp = getRemoteIp(request);
// response immediately.
if (CollectionUtils.isNotEmpty(changedGroup)) {
this.generateResponse(response, changedGroup);
log.info("send response with the changed group, ip={}, group={}", clientIp, changedGroup);
return;
}
// listen for configuration changed.
final AsyncContext asyncContext = request.startAsync();
// AsyncContext.settimeout() does not timeout properly, so you have to control it yourself
asyncContext.setTimeout(0L);
// block client's thread.
scheduler.execute(new LongPollingClient(asyncContext, clientIp, HttpConstants.SERVER_MAX_HOLD_TIMEOUT));
}
Copy the code
The first discovery is that the execution logic is thrown to the thread pool for execution, and the initialization of the thread pool is
this.scheduler = new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("long-polling".true));
Copy the code
The HttpLongPollingDataChangedListener# compareChangedGroup call checkCacheDelayAndUpdate method, There is also a lock in the checkCacheDelayAndUpdate method, which is checked to update the console local cache once it has been successfully acquired.
That is, a cache update asynchronously returns the query result, telling the gateway service that there is a data update.
Single thread, so what does asynchrony mean? Let’s look at the run method of the LongPollingClient thread to find out
Single-threaded TPS is fine when the processing logic is fast (typically no IO operations). Processing logic can be serial, no lock, simple code structure.
@Override
public void run(a) {
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
clients.add(this);
}
Copy the code
Client.remove (longpollingclient.this) and client.add (this) execute concurrently.
Clients initializes a bounded blocking queue in the code above
this.clients = new ArrayBlockingQueue<>(1024);
Copy the code
The LongPollingClient#run method throws the request processing onto the thread, and then throws the LongPollingClient object onto the clients queue. What is this operation, such as blocking the request and asynchronously returning the result?
This should be the use of Tomcat asynchronous Servlet technology, Soul Gateway has too much knowledge of practice, great.
It’s a little hard to see, but it’s interesting to draw a thread model or sequence diagram of the door. The queued code thread DataChangeTask#run appears to be executed by the same method as the listener event we saw two days ago.
@Override
public void run(a) {
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime); }}Copy the code
That is, when an event arrives, the console local cache is updated, responding to a request from the gateway service to tell the gateway that there is an incoming (blocking queue) data update.
3. The gateway server sends an HTTP request
Httpsyncdataservicedolongpolling = httpSyncdataservicedolongPolling = httpSyncdataserviceDolongPolling = httpSyncdataserviceDolongPolling
private void doLongPolling(final String server) {
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for(ConfigGroupEnum group : ConfigGroupEnum.values()) { ConfigData<? > cacheConfig = factory.cacheConfigData(group); String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
try {
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
log.debug("listener result: [{}]", json);
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new SoulException(message, e);
}
if(groupJson ! =null) {
// fetch group configuration async.
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
log.info("Group config changed: {}", Arrays.toString(changedGroups));
this.doFetchGroupConfig(server, changedGroups); }}}Copy the code
Notice the this.dofetchGroupConfig (server, changedGroups) method on this line, which indicates that the /configers/ Listener interface simply tells the gateway service whether the data has been updated and what type of data has been updated. The gateway service then updates the data through the /configs/ FETCH interface.
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
log.info("request configs: [{}]", url);
String json = null;
try {
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
log.warn(message);
throw new SoulException(message, e);
}
// update local cache
boolean updated = this.updateCacheWithJson(json);
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}
// not updated. it is likely that the current config server has not been updated yet. wait a moment.
log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
Copy the code
The /configs/fetch interface is familiar. It is the interface that the gateway service calls when it starts to fetch console data. Then we go back to the console interface code
publicConfigData<? > fetchConfig(final ConfigGroupEnum groupKey) {
ConfigDataCache config = CACHE.get(groupKey.name());
switch (groupKey) {
case APP_AUTH:
List<AppAuthData> appAuthList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<AppAuthData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), appAuthList);
case PLUGIN:
List<PluginData> pluginList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<PluginData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), pluginList);
case RULE:
List<RuleData> ruleList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<RuleData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), ruleList);
case SELECTOR:
List<SelectorData> selectorList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<SelectorData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), selectorList);
case META_DATA:
List<MetaData> metaList = GsonUtils.getGson().fromJson(config.getJson(), new TypeToken<List<MetaData>>() {
}.getType());
return new ConfigData<>(config.getMd5(), config.getLastModifyTime(), metaList);
default:
throw new IllegalStateException("Unexpected groupKey: "+ groupKey); }}Copy the code
ConfigDataCache config = CACHE.get(groupKey.name()); Look up the data from the console cache Map. As for how the gateway service gets the data and updates it to the gateway local cache, it was explained yesterday. It should not be difficult to follow the gateway service call /configs/ FETCH interface to get the data today.
Today the logic is a little bit servicable, threads and locks are a little bit clever, I’ll spend more time drawing pictures later