The Soul gateway learns plug-in chains and load balancing parsing

Author: Zhu Ming

Plug-in chain summary

Start with a class diagram:

Two of the most basic plug-in classes are:

  • SoulPlugin: Interface that defines the responsibilities of a plug-in, with the execute() method being called by the upper layer and the skip() method allowing plug-ins to be skipped on certain requests.

  • AbstractPlugin: Abstract class that implements execute() for an interface, defines a generic execution flow, and provides doExecute() abstract methods for implementing classes to write their own logic using the template method design pattern.

AbstractSoulPlugin

AbstractSoulPlugin class execute()

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  String pluginName = named();
  final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
  // If plugindata.getenabled () is false, it jumps directly to the next plugin, and only a few plugins enter this condition (DividePlugin, AlibabaDubboPlugin, etc.)
  if(pluginData ! =null && pluginData.getEnabled()) {
    // Get all the selectors on the plug-in
    final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
    if (CollectionUtils.isEmpty(selectors)) {
      return CheckUtils.checkSelector(pluginName, exchange, chain);
    }
    // Check whether the request path in the context matches the selector, and get the only selector data that matches
    final SelectorData selectorData = matchSelector(exchange, selectors);
    if (Objects.isNull(selectorData)) {
      if (PluginEnum.WAF.getName().equals(pluginName)) {
        return doExecute(exchange, chain, null.null);
      }
      return CheckUtils.checkSelector(pluginName, exchange, chain);
    }
    if (selectorData.getLoged()) {
      log.info("{} selector success match , selector name :{}", pluginName, selectorData.getName());
    }
    // Get the resource rules in the selector
    final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
    if (CollectionUtils.isEmpty(rules)) {
      if (PluginEnum.WAF.getName().equals(pluginName)) {
        return doExecute(exchange, chain, null.null);
      }
      return CheckUtils.checkRule(pluginName, exchange, chain);
    }
    RuleData rule;
    if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
      rule = rules.get(rules.size() - 1);
    } else {
      // Match the path to get a unique rule
      rule = matchRule(exchange, rules);
    }
    if (Objects.isNull(rule)) {
      return CheckUtils.checkRule(pluginName, exchange, chain);
    }
    if (rule.getLoged()) {
      log.info("{} rule success match ,rule name :{}", pluginName, rule.getName());
    }
    // Execute the subclass method
    return doExecute(exchange, chain, selectorData, rule);
  }
  // Execute the next plug-in on the plug-in chain
  return chain.execute(exchange);
}
Copy the code

Through code analysis, some conclusions can be drawn:

  • Execute() has two logic: one is to request a path match with the selector and the rule, finally confirm a unique rule, and call the subclass doExecute(); The second is to execute the next plug-in on the plug-in chain.
  • Execute () actually abstracts a set of rule matching logic for all “forward type” plug-ins, which I know of so farDividePlugin(HTTP request) andAlibabaDubboPlugin(Dubbo request), other types of plug-ins will simply move on to the next plug-in if they don’t override execute().

SoulPluginChain

There is also a point where plugins are formed and chained. Let’s analyze the SoulPluginChain:

The SoulPluginChain interface also defines the execute() method for callers to use, and its only subclass DefaultSoulPluginChain implements chain calls:

public Mono<Void> execute(final ServerWebExchange exchange) {
  return Mono.defer(() -> {
    // Plugins contain all plugins loaded by gateways
    if (this.index < plugins.size()) {
      // Each time the execute() method is called, index increments and the next plug-in is called
      SoulPlugin plugin = plugins.get(this.index++);
      // Use context to determine whether the current plug-in needs to be skipped
      Boolean skip = plugin.skip(exchange);
      if (skip) {
        return this.execute(exchange);
      } else {
        return plugin.execute(exchange, this); }}else {
      returnMono.empty(); }}); }Copy the code

DefaultSoulPluginChain is a static inner class of SoulWebHandler, and plugins are a property of SoulWebHandle:

public final class SoulWebHandler implements WebHandler {
    
    private List<SoulPlugin> plugins;

    public SoulWebHandler(final List<SoulPlugin> plugins) {
        this.plugins = plugins;
        // ...
    }
  
  	@Override
  	public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
				// ...
        return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
                .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
    }
  
  	private static class DefaultSoulPluginChain implements SoulPluginChain {}}Copy the code

So where do plugins in SoulWebHandler come from? We can trace back to where its constructor was called:

@Configuration
public class SoulConfiguration {
    
    @Bean("webHandler")
    public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
        final List<SoulPlugin> soulPlugins = pluginList.stream()
                .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
        return newSoulWebHandler(soulPlugins); }}Copy the code

You can see that the plugins are started as a Spring Bean, that is, all plug-ins are loaded when the container starts. Using ObjectProvider as the entry parameter here lazily loads all soulPlugin-type beans (no errors if none) and injects them into SoulWebHandler.

There’s a pothole to watch out for!

All plug-ins, including DividePlugin, AlibabaDubboPlugin, and so on, are configured by the XXPluginConfiguration class in their soul-spring-boot-starter-plugin-XX projects. Register your plug-in as a Bean, similar to the following example:

@Configuration
public class DividePluginConfiguration {

    @Bean
    public SoulPlugin dividePlugin(a) {
        return newDividePlugin(); }}Copy the code

So in the gateway project soul-Bootstrap, if you need a plug-in, you don’t just start it in the admin background, The soul-spring-boot-starter-plugin-xx dependency exists in the pum. XML of the soul-bootstrap plugin. For example:

<dependency>
  <groupId>org.dromara</groupId>
  <artifactId>soul-spring-boot-starter-plugin-divide</artifactId>
  <version>${project.version}</version>
</dependency>
Copy the code

If you annotate it or it doesn’t exist, don’t expect to see it on the plugin chain…

Plug-in project structure

Finally, a brief description of the functions of each plug-in project:

  1. The first is the spring Bean startup class project just mentioned, giving an outline:

    soul-spring-boot-starter-plugin-alibaba-dubbo
    soul-spring-boot-starter-plugin-apache-dubbo
    soul-spring-boot-starter-plugin-context-path
    soul-spring-boot-starter-plugin-divide
    soul-spring-boot-starter-plugin-global
    soul-spring-boot-starter-plugin-httpclient
    soul-spring-boot-starter-plugin-hystrix
    soul-spring-boot-starter-plugin-monitor
    soul-spring-boot-starter-plugin-ratelimiter
    soul-spring-boot-starter-plugin-resilience4j
    soul-spring-boot-starter-plugin-rewrite
    soul-spring-boot-starter-plugin-sentinel
    soul-spring-boot-starter-plugin-sign
    soul-spring-boot-starter-plugin-sofa
    soul-spring-boot-starter-plugin-springcloud
    soul-spring-boot-starter-plugin-tars
    soul-spring-boot-starter-plugin-waf
    Copy the code

    Their main function is to register their own SoulPlugin subclasses as Spring beans, and to register spring beans to PluginDataHandler interfaces called in AbstractSoulPlugin. Provide its own implementation subclasses, such as DividePluginDataHandler.

  2. The project where the specific plug-in class resides:

    soul-plugin-alibaba-dubbo
    soul-plugin-apache-dubbo
    soul-plugin-api
    soul-plugin-base
    soul-plugin-context-path
    soul-plugin-divide
    soul-plugin-global
    soul-plugin-httpclient
    soul-plugin-hystrix
    soul-plugin-monitor
    soul-plugin-ratelimiter
    soul-plugin-resilience4j
    soul-plugin-rewrite
    soul-plugin-sentinel
    soul-plugin-sign
    soul-plugin-sofa
    soul-plugin-springcloud
    soul-plugin-tars
    soul-plugin-waf
    Copy the code

    Take the soul-plugin-divide project as an example. The DividePlugin and DividePluginDataHandler just mentioned are among them. The project also has node information cache manager UpstreamCacheManager, load balancing policy class LoadBalance, and so on.

DividePlugin

The function of the DividePlugin is to match Http requests. Since there are Http requests, there are also forwarding downstream and returning responses, so we will analyze three plug-ins here: DividePlugin, WebClientPlugin, WebClientResponsePlugin.

First, let’s look at the doExecute() implementation in DividePlugin. Here I only keep the core points:

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
  final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
  final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
  // Get the cluster of service nodes in the cache with the selector ID
  final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
	// Call the load balancing method and pass in the policy type to get a unique node
  DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
	// Get the actual URL of the node and place it in the Exchange context
  String domain = buildDomain(divideUpstream);
  String realURL = buildRealURL(domain, soulContext, exchange);
  exchange.getAttributes().put(Constants.HTTP_URL, realURL);
  // Continue calling the next plug-in
  return chain.execute(exchange);
}
Copy the code

As you can see, after executing the DividePlugin’s doExecute() method, we already have the actual path to the downstream service node in our ServerWebExchange context, and we just need to request it. But before you worry, the load balancing strategy is also key here. Let’s take a look.

Load balancing

How to implement load balancing on the Soul gateway? Not only does it involve various policies (hasn’t, random, polling), but also the concept of “weight score”.

After showing the background configuration, let’s look at the code implementation of each policy.

Hash

public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
  final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
  for (DivideUpstream address : upstreamList) {
    // Each node *VIRTUAL_NODE_NUM(default 5) to make hash more uniform
    for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
      long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-"+ i); treeMap.put(addressHash, address); }}// Get a hash value from the current IP and compare treemap(ordered) to find the hash value greater than this value
  long hash = hash(String.valueOf(ip));
  SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
  // As long as the number of service nodes is not increased or decreased, the number of nodes obtained by the same IP can remain unchanged
  if(! lastRing.isEmpty()) {return lastRing.get(lastRing.firstKey());
  }
  return treeMap.firstEntry().getValue();
}
Copy the code

Load balancing of hash algorithms does not use the concept of “weight score”, which means that each node is equally likely to be accessed for each unknown IP address (of course, multiple calls to the same IP address will only access the same node).

RandomLoadBalance

public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
  / / the total number
  int length = upstreamList.size();
  / / total weight
  int totalWeight = 0;
  // All weights are the same
  boolean sameWeight = true;
  for (int i = 0; i < length; i++) {
    int weight = upstreamList.get(i).getWeight();
    // Total weight
    totalWeight += weight;
    if (sameWeight && i > 0&& weight ! = upstreamList.get(i -1).getWeight()) {
      // Calculate whether the ownership weight is the same
      sameWeight = false; }}if (totalWeight > 0 && !sameWeight) {
    // If the weights are not the same and the weights are greater than 0, the total weight is random
    int offset = RANDOM.nextInt(totalWeight);
    // And determine which segment the random value falls on
    for (DivideUpstream divideUpstream : upstreamList) {
      offset -= divideUpstream.getWeight();
      if (offset < 0) {
        returndivideUpstream; }}}// If the weight is the same or 0, the parity is random
  return upstreamList.get(RANDOM.nextInt(length));
}
Copy the code

When the random rule is used, the weight points of all nodes are accumulated and a number is randomly obtained to see which node falls on the weight segment. If the score is 0 or the same, the random cluster length is straightforward.

RoundRobinLoadBalance

public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
  String key = upstreamList.get(0).getUpstreamUrl();
  ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
  if (map == null) {
    methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
    map = methodWeightMap.get(key);
  }
  int totalWeight = 0;
  long maxCurrent = Long.MIN_VALUE;
  long now = System.currentTimeMillis();
  DivideUpstream selectedInvoker = null;
  WeightedRoundRobin selectedWRR = null;
  for (DivideUpstream upstream : upstreamList) {
    String rKey = upstream.getUpstreamUrl();
    // Retrieve the node information in the cache
    WeightedRoundRobin weightedRoundRobin = map.get(rKey);
    int weight = upstream.getWeight();
    if (weightedRoundRobin == null) {
      weightedRoundRobin = new WeightedRoundRobin();
      weightedRoundRobin.setWeight(weight);
      map.putIfAbsent(rKey, weightedRoundRobin);
    }
    if(weight ! = weightedRoundRobin.getWeight()) { weightedRoundRobin.setWeight(weight); }// Here is the first key: the score in the cache increases the current node weight score
    long cur = weightedRoundRobin.increaseCurrent();
    weightedRoundRobin.setLastUpdate(now);
    // Select the node with a high cache score
    if (cur > maxCurrent) {
      maxCurrent = cur;
      selectedInvoker = upstream;
      selectedWRR = weightedRoundRobin;
    }
    totalWeight += weight;
  }
  if(! updateLock.get() && upstreamList.size() ! = map.size() && updateLock.compareAndSet(false.true)) {
    try {
      ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
      newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
      methodWeightMap.put(key, newMap);
    } finally {
      updateLock.set(false); }}if(selectedInvoker ! =null) {
    // Here is the second key: the score in the cache, reducing the total node weight score
    selectedWRR.sel(totalWeight);
    return selectedInvoker;
  }
  return upstreamList.get(0);
}
Copy the code

This algorithm is a bit complicated, but let me explain the core aspect of calculating weights:

  • Two nodes with a score of 2 and 100 are entered and retained in the cache respectively, with a score starting from 0
  • After the for loop, the points of the two nodes in the cache will increase from their own base. Assuming the following steps are not performed, the cache will be 2, 100 for the first time, 4, 200 for the second time, and so on.
  • The key third step is to select the node with the highest score in the cache and carry out “penalty” measures to reduce the cumulative score of all nodes, i.e. 102.

According to the algorithm’s steps, the nodes that remain unselected, as “rewards for growth,” continue to grow from their own base; The selected node, as a “penalty “, reduces the sum of the other nodes’ weights.

It can be foreseen that the nodes with small weight points will increase to a long time before they are selected. However, at that moment it will be punished very heavily, leading to the long accumulation of power once it returns to the pre-liberation period. However, the node with a large weight score will have a small penalty for being selected each time. Even if the score is too low to be selected after several times, its reward score (itself) will be extremely high, and its increase will far surpass other nodes.

WebClientPlugin

After the DividePlugin is invoked, the downstream service node path is determined, and the WebClientPlugin comes into play. It implements the SoulPlugin interface directly and implements the execute() method (leaving only the core code):

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  String urlPath = exchange.getAttribute(Constants.HTTP_URL);
  // Request types: Get request orPost request
  HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
  // Build a request object shell and inject the request type and URL
  WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
  return handleRequestBody(requestBodySpec, exchange, timeout, chain);
}

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final SoulPluginChain chain) {
  return requestBodySpec.headers(httpHeaders -> {
    // Add context headers... The following is also to supplement some attributes, do not repeat
    httpHeaders.addAll(exchange.getRequest().getHeaders());
    httpHeaders.remove(HttpHeaders.HOST);
  })
    .contentType(buildMediaType(exchange))
    .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
    // Start an asynchronous HTTP call to the downstream service
    .exchange()
    .doOnError(e -> log.error(e.getMessage()))
    .timeout(Duration.ofMillis(timeout))
    // The callback receives the return value
    .flatMap(e -> doNext(e, exchange, chain));
}

// Here is the asynchronous callback method, working in another thread
private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
  // ...
  // Continue with the rest of the plug-in chain calls
  return chain.execute(exchange);
}
Copy the code

Take a look at the implementation of the Exchange () method in handleRequestBody(). Here are the key Http calls:

class DefaultWebClient implements WebClient {
  @Override
  public Mono<ClientResponse> exchange(a) {
    ClientRequest request = (this.inserter ! =null ?
                             initRequestBuilder().body(this.inserter).build() :
                             initRequestBuilder().build());
    // Here is the key call that goes to Spring-web-Reactive
    return Mono.defer(() -> exchangeFunction.exchange(request)
                      .checkpoint("Request to " + this.httpMethod.name() + "" + this.uri + " [DefaultWebClient]") .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR)); }}Copy the code

To summarize, the processing of the WebClientPlugin asynchronously invokes the downstream service, waits for the response, and then executes subsequent plug-in chain calls in another thread.

WebClientResponseClient

Finally, the plugin chain goes to the WebClientResponseClient loop, which encapsulates the response information:

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  return chain.execute(exchange).then(Mono.defer(() -> {
    // Get the response information stored in the context
    ServerHttpResponse response = exchange.getResponse();
    ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
    if (Objects.isNull(clientResponse)
        || response.getStatusCode() == HttpStatus.BAD_GATEWAY
        || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
      Object error = SoulResultWarp.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
      return WebFluxResultUtils.result(exchange, error);
    } else if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
      Object error = SoulResultWarp.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
      return WebFluxResultUtils.result(exchange, error);
    }
    // all kinds of assembly
    response.setStatusCode(clientResponse.statusCode());
    response.getCookies().putAll(clientResponse.cookies());
    response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
    return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
  }));
}
Copy the code