review

In the previous preliminary article on HTTP request, the processing process of Soul plug-in is generally combed, and the specific functions of DividePlugin, GlobalPlugin, WebClientPlugin and WebCilentResponsePlugin are also known. In the process of sorting, The plugins for Soul were discovered in sequence, and many of the previous plugins were done before the DividePlugin plugin, including the RateLimiterPlugin stream limiting plugin (one of them), the topic we analyzed in this chapter.

Learn to use

Read the official documents to get a sense of it

RateLimiter plug-in

Through reading the official documentation, we know the RateLimiterPlugin’s two core points, speed and capacity

The following explanations are from official documentation

  • Capacity: The maximum number of requests a user can execute in a second. This is the number of tokens the token bucket can hold.
  • Rate: Is how many requests per second you allow the user to make and discard any requests. This is the token bucket filling rate.

It can be seen that the core of RateLimiterPlugin is the implementation of token bucket algorithm.

Ps: There are four common implementations of the flow limiting algorithm: token bucket algorithm, funnel algorithm, counter (fixed window) algorithm and sliding window algorithm. For details, see the corresponding blog introduction

Initial use

Enabling plug-ins

In the Soul Gateway System Administration-Plug-in Manager, change the state to enabled. Note that the configuration for the Soul token bucket is redIS based.

Why is Soul’s token bucket algorithm based on Redis?

In cluster deployment, the token bucket algorithm on a single machine cannot meet the traffic limiting function.

Add traffic limiting selectors and rules

In the Soul, the gatewayThe plugin listSelect rate_limiter to add rules and selector configuration, do not know how to add can be read firstSelector \ matching logic of rules. Both capacity and rate are added here to verify that the plug-in is enabled.

Interface correspondence access

Call _http: / / 127.0.0.1:9195 / HTTP/test/findByUserId? UserId =10_. If the rate is higher than 1, the plug-in is successfully used if the interface returns the following result:

{ "code": 429, "message": "You have been restricted, please try again later!" , "data": null }Copy the code

Read the source code with questions

How to ensure that the redis configuration is immediately effective after the page is modified, and the corresponding redis connection in the background is immediately changed.

The answer is natural data synchronization.

When modifying the configuration of the plug-in, an event notification of the change in the plug-in data is also published, which is combed beforeSoul Overall data synchronization process of the gateway, it has been learned that the modified plug-in data not only changes the data in the JVM cache, but also delivers the corresponding plug-in, as shown in the following figureAnd forRateLimiterPluginIn terms of its main realizationhandlePluginSo what does this implementation actually do?

The specific methods for RateLimiterPluginDataHandler handlerPlugin.

public void handlerPlugin(final PluginData pluginData) { if (Objects.nonNull(pluginData) && pluginData.getEnabled()) { RateLimiterConfig RateLimiterConfig = gsonutils.getInstance ().fromjson (plugindata.getConfig (), RateLimiterConfig.class); / / determine whether need to reload the redis connection values the if (Objects. IsNull (Singleton. INST. Get (ReactiveRedisTemplate. Class)) | | Objects.isNull(Singleton.INST.get(RateLimiterConfig.class)) || ! rateLimiterConfig.equals(Singleton.INST.get(RateLimiterConfig.class))) { LettuceConnectionFactory lettuceConnectionFactory = createLettuceConnectionFactory(rateLimiterConfig); lettuceConnectionFactory.afterPropertiesSet(); RedisSerializer<String> serializer = new StringRedisSerializer(); RedisSerializationContext<String, String> serializationContext = RedisSerializationContext.<String, String>newSerializationContext().key(serializer).value(serializer).hashKey(serializer).hashValue(serializer).build(); ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new ReactiveRedisTemplate<>(lettuceConnectionFactory, serializationContext); Singleton.INST.single(ReactiveRedisTemplate.class, reactiveRedisTemplate); Singleton.INST.single(RateLimiterConfig.class, rateLimiterConfig); }}}Copy the code

The above code has a few key points:

In the above code, the configuration of the stream limiting plug-in and the corresponding redisTemplate instance are put into the singleton. INST map.

When the plug-in data comes in, determine whether there is a Redis connection instance, whether there is a flow limiting configuration instance, determine whether the current flow limiting configuration instance is consistent with the passed flow limiting instance, if not, consider that the configuration has been changed, re-initialize the flow limiting instance and the connection pool instance and put them into the singleton. INST map. This ensures hot deployment of redis configuration changes.

The code in the if judgment is packaged into a corresponding redis connection pool based on SpringDataRedis.

Ps: singleton.inst is a Singleton pattern for enumeration implementations.

How is the stream limiting plugin implemented at the bottom?

The Debug invocation chain

AbstractSoulPlugin RateLimiterPlugin implements AbstractSoulPlugin because it needs to limit the flow of certain rules. AbstractSoulPlugin’s excute methods and functions have been explored before. To get a better idea of this class, watch the Http call flow comb.

This section focuses on what specific doexcute methods do.

protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { final String handle = rule.getHandle(); final RateLimiterHandle limiterHandle = GsonUtils.getInstance().fromJson(handle, RateLimiterHandle.class); return redisRateLimiter.isAllowed(rule.getId(), limiterHandle.getReplenishRate(), limiterHandle.getBurstCapacity()) .flatMap(response -> { if (! Response.isallowed ()) {// Error message 429 Error code exchange.getresponse ().setStatusCode(httpstatus.too_many_requests); Object error = SoulResultWrap.error(SoulResultEnum.TOO_MANY_REQUESTS.getCode(), SoulResultEnum.TOO_MANY_REQUESTS.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } return chain.execute(exchange); }); }Copy the code

It can be seen in the above code is through redisRateLimiter isAllowed to judge whether the success of the access token. The method is as follows

public Mono<RateLimiterResponse> isAllowed(final String id, final double replenishRate, final double burstCapacity) { if (! this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } // get redis Key List<String> keys = getKeys(id); // The first parameter is the rate, the second is the capacity, the third is the current timestamp, 10 bits, the fourth fixed parameter value 1 represents the number of applied tokens List<String> scriptArgs = arrays.aslist (replenishRate +) "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); / / execution lua script Flux < List < Long > > resultFlux = Singleton. INST. Get (ReactiveRedisTemplate. Class). The execute (this script, keys, scriptArgs); return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }). Map (Results -> {//allowed: 1 indicates that the result is successful. Boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft); log.info("RateLimiter response:{}", rateLimiterResponse.toString()); return rateLimiterResponse; }).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage())); }Copy the code

GetKeys method (id)

This method is to obtain the key that redis needs to operate. Two types of keys are obtained in total. The format is as follows:

The particularly long number in the middle is the rule ID, because the minimum granularity of limiting traffic is a rule.

The first timestamp records the timestamp of the last call

The second tokens records the number of tokens left after the last call

Execute (this script, keys, scriptArgs)

Keys passes the return value of getKeys(ID) while scriptArgs passes the required parameters

You already know from reading the above code that the implementation of the limiting rules is handed over to a specific Lua script.

Ps: Here need to remind a minimum flow algorithm is the token bucket algorithm, there are two kinds of token bucket algorithm implementation, one kind is a thread continuously generate token, when the request came in from the corresponding access token of the queue, but this way of token is generated in setting threshold is particularly big, will be very consumption performance, so I have a second token bucket algorithm, The token count is calculated in real time as tokens are fetched, and Soul is based on the second implementation.

Analysis of Lua traffic limiting algorithm

-- Store key local tokens_key = KEYS[1] -- last time the current rule was called local timestamp_key = KEYS[2] -- Rate local rate = Tonumber (ARGV[1]) -- local capacity = tonumber(ARGV[2]) -- timestamp Local now = tonumber(ARGV[3]) -- value 1 Local requested = Tonumber (ARGV[4]) -- Capacity divided by rate Calculate the filling time local fill_time = capacity/rate -- Calculate the lower limit for the expiration time local TTL = math.floor(fill_time*2) -- Local last_tokens = tonumber(redis. Call ("get", Tokens_key)) if last_tokens == nil then -- Last_tokens = Capacity end -- Get the checksum = local last_refreshed = tonumber(redis.call("get", Checksum) if the field is refreshed == nil then last_checksum = 0 end Checksum) -- Checksum local filled_tokens = math.min(capacity, checksum) Last_tokens +(delta*rate)) local allowed = filled_tokens >= Requested local new_tokens = Filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - Requested allowed_num = 1 end Redis. Call ("setex", tokens_key, TTL, new_tokens) now) return { allowed_num, new_tokens }Copy the code

Keys [1] and Argv [1] are used in Redis Lua.

Lua code overall logic is still very clear, here in detail also can not tell what to, code comments have been played all.

I have two questions here

  • The TTL argument is multiplied by 2 for fear that it is not an integer. , so the *2 is the smallest operation, right?
  • Last_tokens +(delta*rate) = last_tokens+(delta*rate) = last_tokens+(delta*rate) Last_tokens +((delta/1000)*rate)