The Soul gateway learns plug-in chains and load balancing parsing
Author: Zhu Ming
Plug-in chain summary
Start with a class diagram:
Two of the most basic plug-in classes are:
-
SoulPlugin: Interface that defines the responsibilities of a plug-in, with the execute() method being called by the upper layer and the skip() method allowing plug-ins to be skipped on certain requests.
-
AbstractPlugin: Abstract class that implements execute() for an interface, defines a generic execution flow, and provides doExecute() abstract methods for implementing classes to write their own logic using the template method design pattern.
AbstractSoulPlugin
AbstractSoulPlugin class execute()
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
// If plugindata.getenabled () is false, it jumps directly to the next plugin, and only a few plugins enter this condition (DividePlugin, AlibabaDubboPlugin, etc.)
if(pluginData ! =null && pluginData.getEnabled()) {
// Get all the selectors on the plug-in
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return CheckUtils.checkSelector(pluginName, exchange, chain);
}
// Check whether the request path in the context matches the selector, and get the only selector data that matches
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
if (PluginEnum.WAF.getName().equals(pluginName)) {
return doExecute(exchange, chain, null.null);
}
return CheckUtils.checkSelector(pluginName, exchange, chain);
}
if (selectorData.getLoged()) {
log.info("{} selector success match , selector name :{}", pluginName, selectorData.getName());
}
// Get the resource rules in the selector
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
if (PluginEnum.WAF.getName().equals(pluginName)) {
return doExecute(exchange, chain, null.null);
}
return CheckUtils.checkRule(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
rule = rules.get(rules.size() - 1);
} else {
// Match the path to get a unique rule
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return CheckUtils.checkRule(pluginName, exchange, chain);
}
if (rule.getLoged()) {
log.info("{} rule success match ,rule name :{}", pluginName, rule.getName());
}
// Execute the subclass method
return doExecute(exchange, chain, selectorData, rule);
}
// Execute the next plug-in on the plug-in chain
return chain.execute(exchange);
}
Copy the code
Through code analysis, some conclusions can be drawn:
- Execute() has two logic: one is to request a path match with the selector and the rule, finally confirm a unique rule, and call the subclass doExecute(); The second is to execute the next plug-in on the plug-in chain.
- Execute () actually abstracts a set of rule matching logic for all “forward type” plug-ins, which I know of so far
DividePlugin
(HTTP request) andAlibabaDubboPlugin
(Dubbo request), other types of plug-ins will simply move on to the next plug-in if they don’t override execute().
SoulPluginChain
There is also a point where plugins are formed and chained. Let’s analyze the SoulPluginChain:
The SoulPluginChain interface also defines the execute() method for callers to use, and its only subclass DefaultSoulPluginChain implements chain calls:
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
// Plugins contain all plugins loaded by gateways
if (this.index < plugins.size()) {
// Each time the execute() method is called, index increments and the next plug-in is called
SoulPlugin plugin = plugins.get(this.index++);
// Use context to determine whether the current plug-in needs to be skipped
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
} else {
return plugin.execute(exchange, this); }}else {
returnMono.empty(); }}); }Copy the code
DefaultSoulPluginChain is a static inner class of SoulWebHandler, and plugins are a property of SoulWebHandle:
public final class SoulWebHandler implements WebHandler {
private List<SoulPlugin> plugins;
public SoulWebHandler(final List<SoulPlugin> plugins) {
this.plugins = plugins;
// ...
}
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
// ...
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
private static class DefaultSoulPluginChain implements SoulPluginChain {}}Copy the code
So where do plugins in SoulWebHandler come from? We can trace back to where its constructor was called:
@Configuration
public class SoulConfiguration {
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return newSoulWebHandler(soulPlugins); }}Copy the code
You can see that the plugins are started as a Spring Bean, that is, all plug-ins are loaded when the container starts. Using ObjectProvider as the entry parameter here lazily loads all soulPlugin-type beans (no errors if none) and injects them into SoulWebHandler.
There’s a pothole to watch out for!
All plug-ins, including DividePlugin, AlibabaDubboPlugin, and so on, are configured by the XXPluginConfiguration class in their soul-spring-boot-starter-plugin-XX projects. Register your plug-in as a Bean, similar to the following example:
@Configuration
public class DividePluginConfiguration {
@Bean
public SoulPlugin dividePlugin(a) {
return newDividePlugin(); }}Copy the code
So in the gateway project soul-Bootstrap, if you need a plug-in, you don’t just start it in the admin background, The soul-spring-boot-starter-plugin-xx dependency exists in the pum. XML of the soul-bootstrap plugin. For example:
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-divide</artifactId>
<version>${project.version}</version>
</dependency>
Copy the code
If you annotate it or it doesn’t exist, don’t expect to see it on the plugin chain…
Plug-in project structure
Finally, a brief description of the functions of each plug-in project:
-
The first is the spring Bean startup class project just mentioned, giving an outline:
soul-spring-boot-starter-plugin-alibaba-dubbo soul-spring-boot-starter-plugin-apache-dubbo soul-spring-boot-starter-plugin-context-path soul-spring-boot-starter-plugin-divide soul-spring-boot-starter-plugin-global soul-spring-boot-starter-plugin-httpclient soul-spring-boot-starter-plugin-hystrix soul-spring-boot-starter-plugin-monitor soul-spring-boot-starter-plugin-ratelimiter soul-spring-boot-starter-plugin-resilience4j soul-spring-boot-starter-plugin-rewrite soul-spring-boot-starter-plugin-sentinel soul-spring-boot-starter-plugin-sign soul-spring-boot-starter-plugin-sofa soul-spring-boot-starter-plugin-springcloud soul-spring-boot-starter-plugin-tars soul-spring-boot-starter-plugin-waf Copy the code
Their main function is to register their own SoulPlugin subclasses as Spring beans, and to register spring beans to PluginDataHandler interfaces called in AbstractSoulPlugin. Provide its own implementation subclasses, such as DividePluginDataHandler.
-
The project where the specific plug-in class resides:
soul-plugin-alibaba-dubbo soul-plugin-apache-dubbo soul-plugin-api soul-plugin-base soul-plugin-context-path soul-plugin-divide soul-plugin-global soul-plugin-httpclient soul-plugin-hystrix soul-plugin-monitor soul-plugin-ratelimiter soul-plugin-resilience4j soul-plugin-rewrite soul-plugin-sentinel soul-plugin-sign soul-plugin-sofa soul-plugin-springcloud soul-plugin-tars soul-plugin-waf Copy the code
Take the soul-plugin-divide project as an example. The DividePlugin and DividePluginDataHandler just mentioned are among them. The project also has node information cache manager UpstreamCacheManager, load balancing policy class LoadBalance, and so on.
DividePlugin
The function of the DividePlugin is to match Http requests. Since there are Http requests, there are also forwarding downstream and returning responses, so we will analyze three plug-ins here: DividePlugin, WebClientPlugin, WebClientResponsePlugin.
First, let’s look at the doExecute() implementation in DividePlugin. Here I only keep the core points:
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
// Get the cluster of service nodes in the cache with the selector ID
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
// Call the load balancing method and pass in the policy type to get a unique node
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
// Get the actual URL of the node and place it in the Exchange context
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// Continue calling the next plug-in
return chain.execute(exchange);
}
Copy the code
As you can see, after executing the DividePlugin’s doExecute() method, we already have the actual path to the downstream service node in our ServerWebExchange context, and we just need to request it. But before you worry, the load balancing strategy is also key here. Let’s take a look.
Load balancing
How to implement load balancing on the Soul gateway? Not only does it involve various policies (hasn’t, random, polling), but also the concept of “weight score”.
After showing the background configuration, let’s look at the code implementation of each policy.
Hash
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
for (DivideUpstream address : upstreamList) {
// Each node *VIRTUAL_NODE_NUM(default 5) to make hash more uniform
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-"+ i); treeMap.put(addressHash, address); }}// Get a hash value from the current IP and compare treemap(ordered) to find the hash value greater than this value
long hash = hash(String.valueOf(ip));
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
// As long as the number of service nodes is not increased or decreased, the number of nodes obtained by the same IP can remain unchanged
if(! lastRing.isEmpty()) {return lastRing.get(lastRing.firstKey());
}
return treeMap.firstEntry().getValue();
}
Copy the code
Load balancing of hash algorithms does not use the concept of “weight score”, which means that each node is equally likely to be accessed for each unknown IP address (of course, multiple calls to the same IP address will only access the same node).
RandomLoadBalance
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
/ / the total number
int length = upstreamList.size();
/ / total weight
int totalWeight = 0;
// All weights are the same
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
int weight = upstreamList.get(i).getWeight();
// Total weight
totalWeight += weight;
if (sameWeight && i > 0&& weight ! = upstreamList.get(i -1).getWeight()) {
// Calculate whether the ownership weight is the same
sameWeight = false; }}if (totalWeight > 0 && !sameWeight) {
// If the weights are not the same and the weights are greater than 0, the total weight is random
int offset = RANDOM.nextInt(totalWeight);
// And determine which segment the random value falls on
for (DivideUpstream divideUpstream : upstreamList) {
offset -= divideUpstream.getWeight();
if (offset < 0) {
returndivideUpstream; }}}// If the weight is the same or 0, the parity is random
return upstreamList.get(RANDOM.nextInt(length));
}
Copy the code
When the random rule is used, the weight points of all nodes are accumulated and a number is randomly obtained to see which node falls on the weight segment. If the score is 0 or the same, the random cluster length is straightforward.
RoundRobinLoadBalance
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
String key = upstreamList.get(0).getUpstreamUrl();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
DivideUpstream selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (DivideUpstream upstream : upstreamList) {
String rKey = upstream.getUpstreamUrl();
// Retrieve the node information in the cache
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
int weight = upstream.getWeight();
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
if(weight ! = weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); }// Here is the first key: the score in the cache increases the current node weight score
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// Select the node with a high cache score
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if(! updateLock.get() && upstreamList.size() ! = map.size() && updateLock.compareAndSet(false.true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false); }}if(selectedInvoker ! =null) {
// Here is the second key: the score in the cache, reducing the total node weight score
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
return upstreamList.get(0);
}
Copy the code
This algorithm is a bit complicated, but let me explain the core aspect of calculating weights:
- Two nodes with a score of 2 and 100 are entered and retained in the cache respectively, with a score starting from 0
- After the for loop, the points of the two nodes in the cache will increase from their own base. Assuming the following steps are not performed, the cache will be 2, 100 for the first time, 4, 200 for the second time, and so on.
- The key third step is to select the node with the highest score in the cache and carry out “penalty” measures to reduce the cumulative score of all nodes, i.e. 102.
According to the algorithm’s steps, the nodes that remain unselected, as “rewards for growth,” continue to grow from their own base; The selected node, as a “penalty “, reduces the sum of the other nodes’ weights.
It can be foreseen that the nodes with small weight points will increase to a long time before they are selected. However, at that moment it will be punished very heavily, leading to the long accumulation of power once it returns to the pre-liberation period. However, the node with a large weight score will have a small penalty for being selected each time. Even if the score is too low to be selected after several times, its reward score (itself) will be extremely high, and its increase will far surpass other nodes.
WebClientPlugin
After the DividePlugin is invoked, the downstream service node path is determined, and the WebClientPlugin comes into play. It implements the SoulPlugin interface directly and implements the execute() method (leaving only the core code):
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
// Request types: Get request orPost request
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
// Build a request object shell and inject the request type and URL
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
return handleRequestBody(requestBodySpec, exchange, timeout, chain);
}
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final SoulPluginChain chain) {
return requestBodySpec.headers(httpHeaders -> {
// Add context headers... The following is also to supplement some attributes, do not repeat
httpHeaders.addAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.HOST);
})
.contentType(buildMediaType(exchange))
.body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
// Start an asynchronous HTTP call to the downstream service
.exchange()
.doOnError(e -> log.error(e.getMessage()))
.timeout(Duration.ofMillis(timeout))
// The callback receives the return value
.flatMap(e -> doNext(e, exchange, chain));
}
// Here is the asynchronous callback method, working in another thread
private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
// ...
// Continue with the rest of the plug-in chain calls
return chain.execute(exchange);
}
Copy the code
Take a look at the implementation of the Exchange () method in handleRequestBody(). Here are the key Http calls:
class DefaultWebClient implements WebClient {
@Override
public Mono<ClientResponse> exchange(a) {
ClientRequest request = (this.inserter ! =null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
// Here is the key call that goes to Spring-web-Reactive
return Mono.defer(() -> exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + "" + this.uri + " [DefaultWebClient]") .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR)); }}Copy the code
To summarize, the processing of the WebClientPlugin asynchronously invokes the downstream service, waits for the response, and then executes subsequent plug-in chain calls in another thread.
WebClientResponseClient
Finally, the plugin chain goes to the WebClientResponseClient loop, which encapsulates the response information:
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
return chain.execute(exchange).then(Mono.defer(() -> {
// Get the response information stored in the context
ServerHttpResponse response = exchange.getResponse();
ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
if (Objects.isNull(clientResponse)
|| response.getStatusCode() == HttpStatus.BAD_GATEWAY
|| response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
Object error = SoulResultWarp.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
} else if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
Object error = SoulResultWarp.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// all kinds of assembly
response.setStatusCode(clientResponse.statusCode());
response.getCookies().putAll(clientResponse.cookies());
response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
}));
}
Copy the code