This article explores the underlying principles of Soul’s Divide plug-in, load balancing, and IP port probing.
- Start soul-admin and use the default configuration to synchronize data in websocket mode
- Start soul-bootstrap and use the default configuration to synchronize data in websocket mode
- Start the soul – examples – HTTP
The underlying principle of divide:
1, the soul – examples – HTTP starts to soul – PluginList after the admin – > divide load data analysis principle
When soul-examples-http is started, After loading beans will call SpringMvcClientBeanPostProcessor postProcessAfterInitialization method (implements the BeanPostProcessor postProcessAfterIniti Soul-admin: /soul-client/ springmVC-register: /soul-client: / springmVC-register: Show the post-pluginlist -> Divide to soul-admin. The following code
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor { private final ThreadPoolExecutor executorService; private final String url; private final SoulSpringMvcConfig soulSpringMvcConfig; /** * Instantiates a new Soul client bean post processor. * * @param soulSpringMvcConfig the soul spring mvc config */ public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) { ValidateUtils.validate(soulSpringMvcConfig); this.soulSpringMvcConfig = soulSpringMvcConfig; url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register"; executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); } @Override public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException { if (soulSpringMvcConfig.isFull()) { return bean; } Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class); RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class); RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class); if (controller ! = null || restController ! = null || requestMapping ! = null) { SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class); String prePath = ""; if (Objects.nonNull(clazzAnnotation)) { if (clazzAnnotation.path().indexOf("*") > 1) { String finalPrePath = prePath; executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url, RpcTypeEnum.HTTP)); return bean; } prePath = clazzAnnotation.path(); } final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass()); for (Method method : methods) { SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class); if (Objects.nonNull(soulSpringMvcClient)) { String finalPrePath = prePath; executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url, RpcTypeEnum.HTTP)); } } } return bean; } private String buildJsonParams(final SoulSpringMvcClient soulSpringMvcClient, final String prePath) { String contextPath = soulSpringMvcConfig.getContextPath(); String appName = soulSpringMvcConfig.getAppName(); Integer port = soulSpringMvcConfig.getPort(); String path = contextPath + prePath + soulSpringMvcClient.path(); String desc = soulSpringMvcClient.desc(); String configHost = soulSpringMvcConfig.getHost(); String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost; String configRuleName = soulSpringMvcClient.ruleName(); String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName; SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder() .context(contextPath) .host(host) .port(port) .appName(appName) .path(path) .pathDesc(desc) .rpcType(soulSpringMvcClient.rpcType()) .enabled(soulSpringMvcClient.enabled()) .ruleName(ruleName) .registerMetaData(soulSpringMvcClient.registerMetaData()) .build(); return OkHttpTools.getInstance().getGson().toJson(registerDTO); }}Copy the code
@RestController @RequestMapping("/soul-client") public class SoulClientController { private final SoulClientRegisterService soulClientRegisterService; /** * Instantiates a new Soul client controller. * * @param soulClientRegisterService the soul client register service */ public SoulClientController(final SoulClientRegisterService soulClientRegisterService) { this.soulClientRegisterService = soulClientRegisterService; } /** * Register spring mvc string. * * @param springMvcRegisterDTO the spring mvc register dto * @return the string */ @PostMapping("/springmvc-register") public String registerSpringMvc(@RequestBody final SpringMvcRegisterDTO springMvcRegisterDTO) { return soulClientRegisterService.registerSpringMvc(springMvcRegisterDTO); },,}Copy the code
@Override @Transactional public String registerSpringMvc(final SpringMvcRegisterDTO dto) { if (dto.isRegisterMetaData()) { MetaDataDO exist = metaDataMapper.findByPath(dto.getPath()); if (Objects.isNull(exist)) { saveSpringMvcMetaData(dto); } } String selectorId = handlerSpringMvcSelector(dto); handlerSpringMvcRule(selectorId, dto); return SoulResultMessage.SUCCESS; }Copy the code
private void saveSpringMvcMetaData(final SpringMvcRegisterDTO dto) {
Timestamp currentTime = new Timestamp(System.currentTimeMillis());
MetaDataDO metaDataDO = MetaDataDO.builder()
.appName(dto.getAppName())
.path(dto.getPath())
.pathDesc(dto.getPathDesc())
.rpcType(dto.getRpcType())
.enabled(dto.isEnabled())
.id(UUIDUtils.getInstance().generateShortUuid())
.dateCreated(currentTime)
.dateUpdated(currentTime)
.build();
metaDataMapper.insert(metaDataDO);
// publish AppAuthData's event
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.META_DATA, DataEventTypeEnum.CREATE,
Collections.singletonList(MetaDataTransfer.INSTANCE.mapToData(metaDataDO))));
}
Copy the code
private void handlerSpringMvcRule(final String selectorId, final SpringMvcRegisterDTO dto) { RuleDO ruleDO = ruleMapper.findByName(dto.getRuleName()); if (Objects.isNull(ruleDO)) { registerRule(selectorId, dto.getPath(), dto.getRpcType(), dto.getRuleName()); }}Copy the code
private void registerRule(final String selectorId, final String path, final String rpcType, final String ruleName) {
RuleHandle ruleHandle = RuleHandleFactory.ruleHandle(RpcTypeEnum.acquireByName(rpcType), path);
RuleDTO ruleDTO = RuleDTO.builder()
.selectorId(selectorId)
.name(ruleName)
.matchMode(MatchModeEnum.AND.getCode())
.enabled(Boolean.TRUE)
.loged(Boolean.TRUE)
.sort(1)
.handle(ruleHandle.toJson())
.build();
RuleConditionDTO ruleConditionDTO = RuleConditionDTO.builder()
.paramType(ParamTypeEnum.URI.getName())
.paramName("/")
.paramValue(path)
.build();
if (path.indexOf("*") > 1) {
ruleConditionDTO.setOperator(OperatorEnum.MATCH.getAlias());
} else {
ruleConditionDTO.setOperator(OperatorEnum.EQ.getAlias());
}
ruleDTO.setRuleConditions(Collections.singletonList(ruleConditionDTO));
ruleService.register(ruleDTO);
}
Copy the code
2. Use soul-bootstrap to access soul-examples-HTTP
In the soul-admin background, System Manage->Plugin we only turn on divide. Soul-bootstrap is the gateway for all web requests to access soul-examples HTTP. It must be forwarded in the Handle method of the WebHandler. We found SoulWebHandler, the implementation class of WebHandler, and found that the handle method in it queried all soulPlugins. After determining that the Plugin was enabled, execute the execute method in it. Query the corresponding Selector and Rule data. Traverse the resolution to the corresponding URI for the request.
public final class SoulWebHandler implements WebHandler { private final List<SoulPlugin> plugins; private final Scheduler scheduler; /** * Instantiates a new Soul web handler. * * @param plugins the plugins */ public SoulWebHandler(final List<SoulPlugin> plugins) { this.plugins = plugins; String schedulerType = System.getProperty("soul.scheduler.type", "fixed"); if (Objects.equals(schedulerType, "fixed")) { int threads = Integer.parseInt(System.getProperty( "soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16))); scheduler = Schedulers.newParallel("soul-work-threads", threads); } else { scheduler = Schedulers.elastic(); } } /** * Handle the web server exchange. * * @param exchange the current server exchange * @return {@code Mono<Void>} to indicate when request handling is complete */ @Override public Mono<Void> handle(@NonNull final ServerWebExchange exchange) { MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName()); Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName()); return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler) .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time))); } private static class DefaultSoulPluginChain implements SoulPluginChain { private int index; private final List<SoulPlugin> plugins; /** * Instantiates a new Default soul plugin chain. * * @param plugins the plugins */ DefaultSoulPluginChain(final List<SoulPlugin> plugins) { this.plugins = plugins; } /** * Delegate to the next {@code WebFilter} in the chain. * * @param exchange the current server exchange * @return {@code Mono<Void>} to indicate when request handling is complete */ @Override public Mono<Void> execute(final ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < plugins.size()) { SoulPlugin plugin = plugins.get(this.index++); Boolean skip = plugin.skip(exchange); if (skip) { return this.execute(exchange); } return plugin.execute(exchange, this); } return Mono.empty(); }); }}}Copy the code
@Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { String body = exchange.getAttribute(Constants.DUBBO_PARAMS); SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext ! = null; MetaData metaData = exchange.getAttribute(Constants.META_DATA); if (! checkMetaData(metaData)) { assert metaData ! = null; log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString()); exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) { exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } Object result = alibabaDubboProxyService.genericInvoker(body, metaData); if (Objects.nonNull(result)) { exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, result); } else { exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, Constants.DUBBO_RPC_RESULT_EMPTY); } exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName()); return chain.execute(exchange); }Copy the code
private RuleData matchRule(final ServerWebExchange exchange, final Collection<RuleData> rules) {
return rules.stream()
.filter(rule -> filterRule(rule, exchange))
.findFirst().orElse(null);
}
Copy the code
private Boolean filterRule(final RuleData ruleData, final ServerWebExchange exchange) {
return ruleData.getEnabled() && MatchStrategyUtils.match(ruleData.getMatchMode(), ruleData.getConditionDataList(), exchange);
}
Copy the code
public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
String matchMode = MatchModeEnum.getMatchModeByCode(strategy);
MatchStrategy matchStrategy = ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);
return matchStrategy.match(conditionDataList, exchange);
}
Copy the code
@Override
public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
return conditionDataList
.stream()
.allMatch(condition -> OperatorJudgeFactory.judge(condition, buildRealData(condition, exchange)));
}
Copy the code
String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {
String realData = "";
ParamTypeEnum paramTypeEnum = ParamTypeEnum.getParamTypeEnumByName(condition.getParamType());
switch (paramTypeEnum) {
case HEADER:
final HttpHeaders headers = exchange.getRequest().getHeaders();
final List<String> list = headers.get(condition.getParamName());
if (CollectionUtils.isEmpty(list)) {
return realData;
}
realData = Objects.requireNonNull(headers.get(condition.getParamName())).stream().findFirst().orElse("");
break;
case URI:
realData = exchange.getRequest().getURI().getPath();
break;
case QUERY:
final MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
realData = queryParams.getFirst(condition.getParamName());
break;
case HOST:
realData = HostAddressUtils.acquireHost(exchange);
break;
case IP:
realData = HostAddressUtils.acquireIp(exchange);
break;
case POST:
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
realData = (String) ReflectUtils.getFieldValue(soulContext, condition.getParamName());
break;
default:
break;
}
return realData;
}
Copy the code
public class MatchOperatorJudge implements OperatorJudge { @Override public Boolean judge(final ConditionData conditionData, final String realData) { if (Objects.equals(ParamTypeEnum.URI.getName(), conditionData.getParamType())) { return PathMatchUtils.match(conditionData.getParamValue().trim(), realData); } return realData.contains(conditionData.getParamValue().trim()); }}Copy the code
Debug verification is performed as follows. The inference is correct:
Load balancing for Divide:
To verify load balancing, let’s launch another soul-examples-http with port number 8189. Soul-admin’s PluginList-> Divide SelectList is now registered. By default, registered HTTP requests are random and stored in the Handle of the rule. The registered service node information is stored in the Selector handle. The process for starting registration is the same as above
Code analysis:
The new startup gateway invocation logic is the same as above, and the load balancing algorithm code department is as follows:
@Slf4j public class DividePlugin extends AbstractSoulPlugin { @Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext ! = null; final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class); final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId()); Log. Error ("divide upstream configuration error: {}", rule.toString()); Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress(); // Load balancing algorithm, Computing call which service DivideUpstream DivideUpstream = LoadBalanceUtils. The selector (upstreamList, ruleHandle getLoadBalance (), IP); if (Objects.isNull(divideUpstream)) { log.error("divide has no upstream"); Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } // set the http url String domain = buildDomain(divideUpstream); String realURL = buildRealURL(domain, soulContext, exchange); exchange.getAttributes().put(Constants.HTTP_URL, realURL); // set the http timeout exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout()); exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry()); return chain.execute(exchange); },,}Copy the code
public class LoadBalanceUtils { /** * Selector divide upstream. * * @param upstreamList the upstream list * @param algorithm the loadBalance algorithm * @param ip the ip * @return the divide upstream */ public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) { LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm); return loadBalance.select(upstreamList, ip); }}Copy the code
public abstract class AbstractLoadBalance implements LoadBalance {
/**
* Do select divide upstream.
*
* @param upstreamList the upstream list
* @param ip the ip
* @return the divide upstream
*/
protected abstract DivideUpstream doSelect(List<DivideUpstream> upstreamList, String ip);
@Override
public DivideUpstream select(final List<DivideUpstream> upstreamList, final String ip) {
if (CollectionUtils.isEmpty(upstreamList)) {
return null;
}
if (upstreamList.size() == 1) {
return upstreamList.get(0);
}
return doSelect(upstreamList, ip);
}
protected int getWeight(final DivideUpstream upstream) {
if (!upstream.isStatus()) {
return 0;
}
int weight = getWeight(upstream.getTimestamp(), getWarmup(upstream.getWarmup(), Constants.DEFAULT_WARMUP), upstream.getWeight());
return weight;
}
private int getWeight(final long timestamp, final int warmup, final int weight) {
if (weight > 0 && timestamp > 0) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
if (uptime > 0 && uptime < warmup) {
return calculateWarmupWeight(uptime, warmup, weight);
}
}
return weight;
}
private int getWarmup(final int warmup, final int defaultWarmup) {
if (warmup > 0) {
return warmup;
}
return defaultWarmup;
}
private int calculateWarmupWeight(final int uptime, final int warmup, final int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
}
Copy the code
@Join public class RandomLoadBalance extends AbstractLoadBalance { private static final Random RANDOM = new Random(); @Override public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) { int totalWeight = calculateTotalWeight(upstreamList); boolean sameWeight = isAllUpStreamSameWeight(upstreamList); if (totalWeight > 0 && ! sameWeight) { return random(totalWeight, upstreamList); } // If the weights are the same or the weights are 0 then random return random(upstreamList); } private boolean isAllUpStreamSameWeight(final List<DivideUpstream> upstreamList) { boolean sameWeight = true; int length = upstreamList.size(); for (int i = 0; i < length; i++) { int weight = getWeight(upstreamList.get(i)); if (i > 0 && weight ! = getWeight(upstreamList.get(i - 1))) { // Calculate whether the weight of ownership is the same sameWeight = false; break; } } return sameWeight; } private int calculateTotalWeight(final List<DivideUpstream> upstreamList) { // total weight int totalWeight = 0; for (DivideUpstream divideUpstream : upstreamList) { int weight = getWeight(divideUpstream); // Cumulative total weight totalWeight += weight; } return totalWeight; } private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) { // If the weights are not the same and the weights are greater than 0, then random by the total number of weights int offset = RANDOM.nextInt(totalWeight); // Determine which segment the random value falls on for (DivideUpstream divideUpstream : upstreamList) { offset -= getWeight(divideUpstream); if (offset < 0) { return divideUpstream; } } return upstreamList.get(0); } private DivideUpstream random(final List<DivideUpstream> upstreamList) { return upstreamList.get(RANDOM.nextInt(upstreamList.size())); }}Copy the code
Soul-examples-ip port exploration for the HTTP service
This is pending. I’ll write it later