The target

  • Integrate flow limiting (rate_limiter) plug-in
  • Current limiting is triggered by the pressure test interface
  • Soul gateway limit plug-in source code analysis
  • conclusion

Introduction to the Soul gateway traffic limiting plug-in

Soul gateway traffic limiting plug-in is the core of the flow control, soul gateway traffic limiting granularity can be interface or parameter level, soul gateway traffic limiting plug-in is based on redis token bucket algorithm, so before integrating soul gateway to build a REDis.

Gateway integrated traffic limiting plug-in

  • pom
  <dependency>
      <groupId>org.dromara</groupId>
      <artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId>
      <version>${soul-version}</version>
  </dependency>
Copy the code

Soul-admin Traffic limiting plug-in configuration

Using current limiting plug-in must correct the redis configuration to the current limiting plug-ins, and open current-limiting plug-in, and then must be new selector and the corresponding current limiting rules, and the rules for the current limit and must be properly configured, otherwise current-limiting may not take effect, if the plug-in in redis configuration is not actually requests at all.

  • Enable the traffic limiting plug-in

  • Selector configuration

  • Rule configuration

Current limiting is triggered by the pressure test interface

  • Pressure measuring tool: jmeter3.2
  • Pressure measurement parameters: 1 second, 20 concurrent, execute a cycle
  • Traffic limiting rule parameters: Capacity 10, rate 10
  • View the traffic limiting result

Current limiting plug-in source code analysis

The soul gateway uses the responsibility chain pattern to match the execution of the plugin chain, so we will directly look at the flow limiting plugin. The soul gateway uses the responsibility chain pattern to match the execution of the plugin chain.

  • RateLimiterPluginConfiguration
@ Configuration public class RateLimiterPluginConfiguration {/ * * * current-limiting plug-in Configuration * / @ Bean public SoulPlugin rateLimiterPlugin () Return new RateLimiterPlugin(new RedisRateLimiter()); } / redis client initialization * / * * * @ Bean public PluginDataHandler rateLimiterPluginDataHandler () {return new RateLimiterPluginDataHandler(); }}Copy the code
  • RateLimiterPluginDataHandler redis connection pool initialization
public class RateLimiterPluginDataHandler implements PluginDataHandler { @Override public void handlerPlugin(final PluginData) {if (objects.nonnull (PluginData) && plugindata.getenabled ()) {// Initialize redis configuration parameters RateLimiterConfig rateLimiterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), RateLimiterConfig.class);  // If the reactiveRedisTemplate is empty, or the redis configuration parameter object is empty, To create the redis client (connection pool) if (Objects. IsNull (Singleton. INST. Get (ReactiveRedisTemplate. Class)) | | Objects.isNull(Singleton.INST.get(RateLimiterConfig.class)) || ! rateLimiterConfig.equals(Singleton.INST.get(RateLimiterConfig.class))) { LettuceConnectionFactory lettuceConnectionFactory = createLettuceConnectionFactory(rateLimiterConfig); lettuceConnectionFactory.afterPropertiesSet(); RedisSerializer<String> serializer = new StringRedisSerializer(); RedisSerializationContext<String, String> serializationContext = RedisSerializationContext.<String, String>newSerializationContext().key(serializer).value(serializer).hashKey(serializer).hashValue(serializer).build(); ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(lettuceConnectionFactory, serializationContext); // create reactiveRedisTemplate. Create reactiveRedisTemplate and rateLimiterConfig is Singleton Singleton. INST. Single (reactiveRedisTemplate. Class, reactiveRedisTemplate); Singleton.INST.single(RateLimiterConfig.class, rateLimiterConfig); } } } @Override public String pluginNamed() { return PluginEnum.RATE_LIMITER.getName(); } / / create based on Lettuce redis connection pool private LettuceConnectionFactory createLettuceConnectionFactory (final RateLimiterConfig rateLimiterConfig) { LettuceClientConfiguration lettuceClientConfiguration = getLettuceClientConfiguration(rateLimiterConfig); if (RedisModeEnum.SENTINEL.getName().equals(rateLimiterConfig.getMode())) { return new LettuceConnectionFactory(redisSentinelConfiguration(rateLimiterConfig), lettuceClientConfiguration); } if (RedisModeEnum.CLUSTER.getName().equals(rateLimiterConfig.getMode())) { return new LettuceConnectionFactory(redisClusterConfiguration(rateLimiterConfig), lettuceClientConfiguration); } return new LettuceConnectionFactory(redisStandaloneConfiguration(rateLimiterConfig), lettuceClientConfiguration); } private LettuceClientConfiguration getLettuceClientConfiguration(final RateLimiterConfig rateLimiterConfig) { return LettucePoolingClientConfiguration.builder().poolConfig(getPoolConfig(rateLimiterConfig)).build(); } // Redis connection pool configuration private GenericObjectPoolConfig<? > getPoolConfig(final RateLimiterConfig rateLimiterConfig) { GenericObjectPoolConfig<? > config = new GenericObjectPoolConfig<>(); config.setMaxTotal(rateLimiterConfig.getMaxActive()); config.setMaxIdle(rateLimiterConfig.getMaxIdle()); config.setMinIdle(rateLimiterConfig.getMinIdle()); if (rateLimiterConfig.getMaxWait() ! = null) { config.setMaxWaitMillis(rateLimiterConfig.getMaxWait().toMillis()); } return config; } / single point redis * * * * / protected final RedisStandaloneConfiguration RedisStandaloneConfiguration (final RateLimiterConfig rateLimiterConfig) { RedisStandaloneConfiguration config = new RedisStandaloneConfiguration(); String[] parts = StringUtils.split(rateLimiterConfig.getUrl(), ":"); assert parts ! = null; config.setHostName(parts[0]); config.setPort(Integer.parseInt(parts[1])); if (rateLimiterConfig.getPassword() ! = null) { config.setPassword(RedisPassword.of(rateLimiterConfig.getPassword())); } config.setDatabase(rateLimiterConfig.getDatabase()); return config; } / Cluster redis Cluster * * * * / private RedisClusterConfiguration RedisClusterConfiguration (final RateLimiterConfig rateLimiterConfig) { RedisClusterConfiguration config = new RedisClusterConfiguration(); config.setClusterNodes(createRedisNode(rateLimiterConfig.getUrl())); if (rateLimiterConfig.getPassword() ! = null) { config.setPassword(RedisPassword.of(rateLimiterConfig.getPassword())); } return config; } / / Sentinel (sentry) model of redis cluster private RedisSentinelConfiguration RedisSentinelConfiguration (final RateLimiterConfig rateLimiterConfig) { RedisSentinelConfiguration config = new RedisSentinelConfiguration(); config.master(rateLimiterConfig.getMaster()); config.setSentinels(createRedisNode(rateLimiterConfig.getUrl())); if (rateLimiterConfig.getPassword() ! = null) { config.setPassword(RedisPassword.of(rateLimiterConfig.getPassword())); } config.setDatabase(rateLimiterConfig.getDatabase()); return config; } private List<RedisNode> createRedisNode(final String url) { List<RedisNode> redisNodes = new ArrayList<>(); List<String> nodes = Lists.newArrayList(Splitter.on(";" ).split(url)); for (String node : nodes) { String[] parts = StringUtils.split(node, ":"); Assert.state(Objects.requireNonNull(parts).length == 2, "Must be defined as 'host:port'"); redisNodes.add(new RedisNode(parts[0], Integer.parseInt(parts[1]))); } return redisNodes; }}Copy the code
  • RedisRateLimiter The process executes traffic limiting
@slf4j public class RedisRateLimiter {// lua script private final RedisScript<List<Long>> script; Private final AtomicBoolean Initialized = new AtomicBoolean(false); /** * Instantiates a new Redis rate limiter. */ public RedisRateLimiter() { this.script = redisScript(); initialized.compareAndSet(false, true); } /** * This uses a basic token bucket algorithm and relies on the fact that Redis scripts * execute atomically. No other operations can run between fetching the count and * writing the new count. * * @param id is rule id * @param replenishRate replenishRate * @param burstCapacity burstCapacity * @return {@code Mono<Response>} to indicate when request processing is complete */ @SuppressWarnings("unchecked") public Mono<RateLimiterResponse> isAllowed(final String Id, final double replenishRate, final double burstCapacity) {// Whether the current limiter successfully initialized if (! this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } // redis key List<String> keys = getKeys(id); // LuA script List<String> scriptArgs = Arrays. AsList (replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); / / execute current limiting Flux < List < Long > > resultFlux = Singleton. INST. Get (ReactiveRedisTemplate. Class). The execute (this script, keys, scriptArgs); Return resultflux.onerrorResume (throwable -> flux.just (array.asList (1L, -1L))).reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft); log.info("RateLimiter response:{}", rateLimiterResponse.toString()); return rateLimiterResponse; }).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage())); } private static List<String> getKeys(final String id) { String prefix = "request_rate_limiter.{" + id; String tokenKey = prefix + "}.tokens"; String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); // Scripting /request_rate_limiter. Lua @SuppressWarnings("unchecked") private RedisScript<List<Long>>  redisScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/META-INF/scripts/request_rate_limiter.lua"))); redisScript.setResultType(List.class); return redisScript; }}Copy the code
  • RateLimiterPlugin A stream limiting plugin
public class RateLimiterPlugin extends AbstractSoulPlugin { private final RedisRateLimiter redisRateLimiter; /** * Instantiates a new Rate limiter plugin. * * @param redisRateLimiter the redis rate limiter */ public RateLimiterPlugin(final RedisRateLimiter redisRateLimiter) { this.redisRateLimiter = redisRateLimiter; } @Override public String named() { return PluginEnum.RATE_LIMITER.getName(); } @Override public int getOrder() { return PluginEnum.RATE_LIMITER.getCode(); Override protected Mono<Void> doExecute(Final ServerWebExchange exchange, final SoulPluginChain chain, Final SelectorData selector, final RuleData rule) {// Get traffic limiting data from the rule final String Handle = rule-gethandle (); Final RateLimiterHandle limiterHandle = gsonutils.getInstance ().fromjson (handle, RateLimiterHandle.class); / / returns the result data on current limit and process the return. RedisRateLimiter isAllowed (rule. The getId (), limiterHandle. GetReplenishRate (), LimiterHandle. GetBurstCapacity ()). FlatMap (whether the response - > {/ / current limiting the if (! response.isAllowed()) { exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); Object error = SoulResultWrap.error(SoulResultEnum.TOO_MANY_REQUESTS.getCode(), SoulResultEnum.TOO_MANY_REQUESTS.getMsg(), null); / / are current limiting, processing interface returned to return WebFluxResultUtils. Result (exchange, error); } // Return chain.execute(exchange); }); }}Copy the code
  • Request_rate_limiter. Lua Lua script for the traffic limiting token bucket algorithm, the code logic of this script will be analyzed later. The advantage of using lua script here is to ensure the atomization of redis operations
-- -- Licensed to the Apache Software Foundation (ASF) under one or more -- contributor license agreements. See the NOTICE file distributed with -- this work for additional information regarding copyright ownership. -- The ASF licenses This file to You under the Apache License, Version 2.0 -- (the "License"); you may not use this file except in compliance with -- the License. You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. -- local tokens_key = KEYS[1] local timestamp_key = KEYS[2] --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) local rate = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local now = tonumber(ARGV[3]) local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) return { allowed_num, new_tokens }Copy the code

conclusion

Soul gateway integration using the flow limiting plug-in is very simple, it is necessary to pay attention to the correct configuration of the plug-in redis information, create their own selector, flow limiting rules according to the business to choose the appropriate matching mode; Subsequent we also on the source code of the limiting plug-in for general analysis, relying on the soul gateway responsibility chain processing plug-in model, limiting plug-in as a chain of responsibility chain is also according to the matching rules of the plug-in processing, in the redis service soul gateway support single point Redis, Cluster cluster, Sentinel cluster mode, The soul gateway redis client configuration uses the REDis connection pool, and in order to ensure the atomicity of redis operation, the Soul gateway traffic limiting algorithm is implemented using Lua script.