A list,
In the last section, we learned the use of the Soul gateway limit plug-in. In this section, we will take a look at the core principle and key code implementation behind it.
Second, source code analysis
1. RateLimiterPluginConfiguration parsing in the Soul – admin opens the RateLimiter plug-ins, and configure the rules, In the soul – the bootstrap start automatically after loading the configuration class RateLimiterPluginConfiguration, automatically added current-limiting RateLimiterPlugin plugin to the container, as shown below:
@Configuration
public class RateLimiterPluginConfiguration {
@Bean
public SoulPlugin rateLimiterPlugin(a) {
return new RateLimiterPlugin(new RedisRateLimiter());
}
@Bean
public PluginDataHandler rateLimiterPluginDataHandler(a) {
return newRateLimiterPluginDataHandler(); }}Copy the code
RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin: RateLimiterPlugin
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final String handle = rule.getHandle();
// Get configuration parameters
final RateLimiterHandle limiterHandle = GsonUtils.getInstance().fromJson(handle, RateLimiterHandle.class);
// Use response.isallowed () to determine whether the plugin chain continues
return redisRateLimiter.isAllowed(rule.getId(), limiterHandle.getReplenishRate(), limiterHandle.getBurstCapacity())
.flatMap(response -> {
if(! response.isAllowed()) { exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); Object error = SoulResultWrap.error(SoulResultEnum.TOO_MANY_REQUESTS.getCode(), SoulResultEnum.TOO_MANY_REQUESTS.getMsg(),null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
});
}
Copy the code
Use Response.isallowed () to determine whether the plug-in chain continues execution. If false, an exception message is thrown.
3. RedisRateLimiter parsing
- Generate Keys from RuleID
private static List<String> getKeys(final String id) {
String prefix = "request_rate_limiter.{" + id;
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
Copy the code
- / meta-INF /scripts/request_rate_limiter. Lua
private RedisScript<List<Long>> redisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/META-INF/scripts/request_rate_limiter.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
Copy the code
- Read the above two methods, and then see RedisRateLimiter. IsAllowed () method:
public Mono<RateLimiterResponse> isAllowed(final String id, final double replenishRate, final double burstCapacity) {
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
// Generate Keys based on RuleID
List<String> keys = getKeys(id);
// Pass keys and scriptArgs (rate, capacity, current timestamp (in seconds), number of tokens currently required) to lua as input parameters
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + ""."1");
/ / call ReactiveRedisTemplate. The execute () method performs the lua script
Flux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(this.script, keys, scriptArgs);
return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}).map(results -> {
boolean allowed = results.get(0) = =1L;
Long tokensLeft = results.get(1);
RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft);
log.info("RateLimiter response:{}", rateLimiterResponse.toString());
return rateLimiterResponse;
}).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage()));
}
Copy the code
Redis executes the Lua script to determine the current number of token buckets remaining and refresh the token bucket, returning: Can continue access, token bucket remaining capacity.
Request_rate_limiter. Lua = request_rate_limiter.
local tokens_key = KEYS[1] local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. Tokens_key) -- Parameter 1: rate local rate = tonumber(ARGV[1]) -- Parameter 2: capacity local capacity = tonumber(ARGV[2]) -- parameter 3: Local now = tonumber(ARGV[3]) -- parameter 4: Local Requested = tonumber(ARGV[4]) -- Fill time = capacity/rate local fill_time = capacity/rate --keys expiration time local TTL = Math.floor (fill_time*2) -- If the token is empty, Local last_tokens = tonumber(redis.call("get", Tokens_key)) if last_tokens == nil then last_tokens = capacity end Checksum = tonumber(redis.call("get", Refresh timestamp_key)) if the field is refreshed == nil then last_refreshed = 0 end Checksum local delta = math.max(0, now-last_checksum) -- The number of last token buckets + the number of currently filled buckets (time difference * request rate), and the minimum value for capacity. Local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) Local allowed = filled_tokens >= Requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then New_tokens = filled_tokens - Requested allowed_num = 1 end redis. Call ("setex", tokens_key, TTL, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }Copy the code
conclusion
The rateLimiter plugin is implemented by executing lua scripts in Redis to ensure atomic operations. The basic operating principle is clear, if you want to thoroughly understand, but also down to learn about lua script related knowledge.