Introduction to the
After exploring the process in the previous article, today we’ll explore the resilience4J plugin for the lower limit flows
The sample run
Environment configuration
Start MySQL and Redis
docker run -dit --name redis -p 6379:6379 redis
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Copy the code
Soul-admin startup and related configuration
Run soul-admin and go to the admin interface: System Administration > Plug-in Management > Resilience4J, click Edit and turn it on
Add selectors and rules, here install the matching mode of Divide plug-in, so that the interfaces of Divide/HTTP prefix are limited (because the official HTTP test is used when testing).
Rule configuration: The token filling number must be greater than 0. Otherwise, an error will be reported
Circuit enable Indicates the logic of traffic limiting
Others: Set fallback URI to any path. Set other parameters to 1
The soul-bootstrap configuration starts
Enter the related dependencies in soul-Bootstrap, which look like this:
<! -- soul resilience4j plugin start-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-resilience4j</artifactId>
<version>${last.version}</version>
</dependency>
<! -- soul resilience4j plugin end-->
Copy the code
Start the Soul – the Bootstrap
HTTP sample startup
Launch: soul-examples –> SoulTestHttpApplication
On the divide management page, you can view information about registered interfaces
Visit: http://127.0.0.1:9195/http/order/findById? id=1111
Successful run, now start source debug
{
"id": "1111"."name": "hello world findById"
}
Copy the code
Source code for the Debug
Track and confirm the flow limiting process sequence
From the previous article, we have a clear idea of the processing flow. From debugging, we know that RateLimiterPlugin is derived from AbstractSoulPlugin, which will follow the logic related to route matching, as shown in the code below. Use the flow limiting logic of doExcute only after the match is successful
# AbstractSoulPlugin
// The route is matched first
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if(pluginData ! =null && pluginData.getEnabled()) {
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return CheckUtils.checkSelector(pluginName, exchange, chain);
}
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
if (PluginEnum.WAF.getName().equals(pluginName)) {
return doExecute(exchange, chain, null.null);
}
return CheckUtils.checkSelector(pluginName, exchange, chain);
}
if (selectorData.getLoged()) {
log.info("{} selector success match , selector name :{}", pluginName, selectorData.getName());
}
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
if (PluginEnum.WAF.getName().equals(pluginName)) {
return doExecute(exchange, chain, null.null);
}
return CheckUtils.checkRule(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return CheckUtils.checkRule(pluginName, exchange, chain);
}
if (rule.getLoged()) {
log.info("{} rule success match ,rule name :{}", pluginName, rule.getName());
}
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
# RateLimiterPlugin
// Logic of limiting traffic after matching
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assertsoulContext ! =null;
// Where can all the rules be filled out
Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);
// select * from 'Circle enable' where '1' = 'combined' and 'limit' = '0'
if (resilience4JHandle.getCircuitEnable() == 1) {
return combined(exchange, chain, rule);
}
return rateLimiter(exchange, chain, rule);
}
// This is a bit complicated, so I can only continue to follow it
private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) {
return ratelimiterExecutor.run(
chain.execute(exchange), fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule))
.onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, throwable));
}
Copy the code
RateLimiter is a bit confusing at the beginning. I can’t understand the flow programming knowledge, but I can generally understand the flow limiting logic
public class RateLimiterExecutor implements Executor {
@Override
public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
// Generate current limiter
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
// The current limiting logic should be triggered here
Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
if(fallback ! =null) {
return to.onErrorResume(fallback);
}
returnto; }}Copy the code
Moving on to the above class, we see obvious logic for generating the limiter, but one thing that is confusing is that we don’t see obvious limiter trigger logic for returning the Mono. Feeling confused without the basics of responsive programming, and not yet locating where the actual trigger code is? But the speculation was triggered by the passage noted in the comment above
Because of the response, there’s no way to follow, so we have to find another path, and see what the specific flow limiting logic looks like, right
RateLimiter is a current limiter. Let’s look at its implementation
Discovery is an interface, we see what it has achieved, found that there are two: SemaphoreBasedRateLimiter and AtomicRateLimiter
Without knowing which one to use, we set breakpoints for any function that might be executed in either class
Restart the sending request, repeatedly jump breakpoints, and finally enter a stream limiter class: AtomicRateLimiter, roughly as follows
# AtomicRateLimiter
public long reservePermission(final int permits) {
long timeoutInNanos = ((AtomicRateLimiter.State)this.state.get()).config.getTimeoutDuration().toNanos();
AtomicRateLimiter.State modifiedState = this.updateStateWithBackOff(permits, timeoutInNanos);
boolean canAcquireImmediately = modifiedState.nanosToWait <= 0L;
if (canAcquireImmediately) {
this.publishRateLimiterEvent(true, permits);
return 0L;
} else {
boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait;
if (canAcquireInTime) {
this.publishRateLimiterEvent(true, permits);
return modifiedState.nanosToWait;
} else {
this.publishRateLimiterEvent(false, permits);
return -1L; }}}Copy the code
The implementation logic is not our focus this time, but what is the sequence of processes handled in the plugin
As in the previous articles, we can set a breakpoint at: SoulWebHandler to see what the order of execution of the current limiter is
Through debugging, we found that the sequence was basically the same as we expected: When entering the RateLimiterPlugin to execute, the breakpoint also reached the AtomicRateLimiter. Divide and other plug-ins started to execute after the flow limiter logic was completed
Some thoughts on enforcing penalties and Mono
Let’s take a look at the following stream limiting code:
public class RateLimiterExecutor implements Executor {
@Override
public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
// Generate current limiter
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
// The current limiting logic should be triggered here
Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
if(fallback ! =null) {
return to.onErrorResume(fallback);
}
returnto; }}Copy the code
Return a Mono
Let’s look at something like Divide, which also returns Mono
public class DividePlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assertsoulContext ! =null;
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// set the http url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
returnchain.execute(exchange); }}Copy the code
Take a look at the familiar: SoulWebHandler
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
Copy the code
In the above function, you can see in English that all plugins return a Mono
We combine a related concept of reactive programming: publish and subscribe. That is, these plugin monos are published to a queue, and when subscribed, they are taken out and executed sequentially
The logic of subscribing is generally there, so let’s take a look at our third analysis :Soul Gateway source code reading (iii) Request processing overview
In class HttpServerHandle, find a suspicious section: HttpServerHandle
public void onStateChange(Connection connection, State newState) {
if (newState == HttpServerState.REQUEST_RECEIVED) {
try {
if (log.isDebugEnabled()) {
log.debug(ReactorNetty.format(connection.channel(), "Handler is being applied: {}"), new Object[]{this.handler});
}
HttpServerOperations ops = (HttpServerOperations)connection;
// There is publishing and subscribing, and handler.apply(ops, ops) constantly calls the logic of subsequent plugins
Mono.fromDirect((Publisher)this.handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());
} catch (Throwable var4) {
log.error(ReactorNetty.format(connection.channel(), ""), var4); connection.channel().close(); }}}Copy the code
The flow limiting Mono precedes Divide, so the flow limiting is implemented first, which is roughly as follows:
The fromDirect function triggers the Plugin Mono to be queued. The subscribe function triggers the execution, and the execution order is first in, first out. If GlobalPlugin is first in, the execution starts first. That order corresponds to our debugging guess
Responsive programming hasn’t been thoroughly studied, so it could be wrong
Doubt point
In the following section of the generation of the flow limiter logic, it seems that each request is a new generation, whether it is possible to reuse, configuration add a field, indicating whether the update, no update, we will reuse our previous flow limiter; If there’s an update we’ll just generate a new one
Of course, the above optimization, need to understand the dynamic configuration update, and then see if it is feasible
May be not familiar with Resilience4J, may Resilience4JRegistryFactory itself in the following code to achieve the cache reuse
public class RateLimiterExecutor implements Executor {
@Override
public <T> Mono<T> run(final Mono<T> toRun, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf conf) {
// Generate current limiter
RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(conf.getId(), conf.getRateLimiterConfig());
// The current limiting logic should be triggered here
Mono<T> to = toRun.transformDeferred(RateLimiterOperator.of(rateLimiter));
if(fallback ! =null) {
return to.onErrorResume(fallback);
}
returnto; }}Copy the code
conclusion
This article explores the configuration of the resilience4J plug-in. The debugging verifies the execution sequence of the current-limiting logic in the plugin chain, and finds that the execution sequence of the current-limiting logic is consistent with that of the plugin
The execution conjecture of plugin chain in Mono queue is preliminarily discussed. Later, we will verify whether the conjecture is normal when we study reactive programming
Finally, some optimization questions generated by the current limiter are put forward to see whether their conjecture can be verified when configuring and updating relevant analysis later
Refer to the link
- Resilience4j plug-in
- Resilience4j source code analysis (3) : RateLimiter and common traffic limiting algorithms
Soul Gateway source code analysis article list
Github
-
Soul source reading (a) overview
-
Soul source code reading (two) the initial run of the code
-
HTTP request processing overview
-
Dubbo Request Overview
-
Soul Gateway source code reading (five) request type exploration
-
Soul Gateway source code reading (6) Sofa request processing overview
-
HTTP parameter request error
The Denver nuggets
-
Soul Gateway source code read (a) overview
-
Soul Gateway source code reading (two) the initial operation of the code
-
Soul Gateway source code reading (3) Request processing overview
-
Dubbo Request Overview
-
Soul Gateway source code reading (five) request type exploration
-
Soul Gateway source code reading (6) Sofa request processing overview
-
HTTP parameter request error