This is the 26th day of my participation in the August Text Challenge.More challenges in August
Common traffic limiting algorithms
Counter algorithm
Counter algorithm realized by using counter current limiting is a bit simple and crude, normally we will limit one second can pass the number of requests, such as current limiting QPS to 100, the realization of the algorithm idea is started from the first request in time, within the next 1 s, each to a request, just add 1 count, if the accumulative number 100, Subsequent requests will be rejected altogether. Wait until the 1s end, restore the count to 0, restart the count. The implementation might look like this: for each service invocation, the AtomicLong#incrementAndGet() method increments the counter by one and returns the latest value, which is compared to the threshold. I believe we all know that there is a drawback to this implementation: if I have passed 100 requests in the first 10ms of 1s per unit time, I can only refuse the requests in the later 990ms. We call this phenomenon “spike phenomenon”.
Bucket algorithm
Leaky bucket algorithm In order to eliminate the “spike phenomenon”, you can use the leaky bucket algorithm to limit the flow. The leaky bucket algorithm is very vividly named. There is a container inside the algorithm, similar to the funnel used in life. No matter how big the flow is above, the velocity of the flow below remains the same. No matter how unstable the service caller is, the request is processed every 10 milliseconds through a leak-bucket algorithm that limits the flow. Because the speed of processing is fixed, the speed of incoming requests is unknown. Many requests may suddenly come in, and the requests that are not processed will be put in the bucket first. Since it is a bucket, there must be a capacity limit.
On the algorithm implementation side, a queue can be prepared to hold requests, and a thread pool (ScheduledExecutorService) can be used to periodically fetch requests from the queue and execute them. Multiple concurrent executions can be obtained at once. This algorithm also has disadvantages after use: it cannot deal with sudden traffic in a short time.
Token bucket algorithm
In a sense, the token bucket algorithm is an improvement on the leaky bucket algorithm. The bucket algorithm can limit the rate of request invocation, while the token bucket algorithm can limit the average rate of invocation while allowing a certain degree of burst invocation. In the token bucket algorithm, there is a bucket that holds a fixed number of tokens. There is a mechanism in the algorithm to put tokens into buckets at a certain rate. Each request invocation requires a token to be obtained, and only after the token is obtained can execution continue. Otherwise, the option is to wait for available tokens or reject them outright. Put a token this action is ongoing, if the number of tokens in the bucket to limit, discarding the token, so there is this kind of situation, ladle has been a large number of tokens are available, and then the incoming request can get directly to the token, such as setting up QPS for 100, so after completing a second current limiter is initialized, it has been 100 barrels token, At this time, the service is not fully started, and when the service is completed, the current limiter can withstand 100 instantaneous requests. So, the request waits only if there is no token in the bucket, and eventually executes at a rate.
You can prepare a queue to store tokens, and use a thread pool to periodically generate tokens and put them in the queue. Each request will fetch a token from the queue and continue execution.
Realization of token bucket traffic limiting algorithm interpretation based on Redis-Lua
The key value of the token bucket in Redis
local tokens_key = KEYS[1]
-- The value of the key corresponding to the last time the token bucket was refreshed
local timestamp_key = KEYS[2]
-- Token fill rate per unit time
local rate = tonumber(ARGV[1])
-- Token bucket capacity
local capacity = tonumber(ARGV[2])
-- Current time
local now = tonumber(ARGV[3])
The number of tokens required by the request
local requested = tonumber(ARGV[4])
Token bucket capacity/token fill rate = time required for token bucket to fill
local fill_time = capacity/rate
-- Token expiration time Fill time *2
local ttl = math.floor(fill_time*2)
-- Gets the number of tokens left in the last token bucket
local last_tokens = tonumber(redis.call("get", tokens_key))
If not, either the token bucket is new, it didn't exist before, or it hasn't been used for a long time
The token bucket has expired, and it needs to be initialized. The token bucket is full
if last_tokens == nil then
last_tokens = capacity
end
-- Gets the time of the last refresh, and if not, or has expired, initialize to 0
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
- Calculates the difference between the last refresh time and the current refresh time
local delta = math.max(0, now-last_refreshed)
-- delta*rate = the number of tokens this time difference can fill,
-- Number of preexisting tokens in the token bucket = number of filled tokens + number of existing tokens in the token bucket
-- assumes that the token bucket has capacity, so if the calculated value is greater than the token bucket capacity, the token capacity will prevail
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
-- Check whether the number of tokens in the token bucket is all the required number of tokens for this request. If not, it indicates that the traffic is restricted
local allowed = filled_tokens >= requested
-- Two variables are declared, one is the number of new tokens, one is whether the traffic is restricted, 0 means limited traffic, 1 means no line
local new_tokens = filled_tokens
local allowed_num = 0
Filled_tokens >= requested,
-- Number of new tokens = the number of tokens in the token bucket just calculated minus the number of tokens to use this time
-- And set the result of traffic limiting to unrestricted
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
-- Store the number of tokens in the token bucket after this operation and the refresh time
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- Returns whether the flow is restricted and the number of tokens left in the token bucket
return { allowed_num, new_tokens }
Copy the code
- The KEYS [1], the KEYS [2] ARGV [1], ARGV [2]… Represents the list of variables passed in when the Lua script is called.
- KEYS[I] = KEYS[I] = KEYS[I] = KEYS[I] = KEYS[I
- ARGV[I] specifies the variable ARGV[I] passed when the lua script is called.
[demo1,demo2] ARGV: [demo1,demo2] ARGV: [demo1,demo2] ARGV: KEYS[1] = redis. Get (demo1) ARGV[1] = 3
SpringBoot call RedisLua
Introduction of depend on
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
Copy the code
Create a new scripts folder and put your Lua scripts in it
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }
Copy the code
Construct a Redis-script object
Springboot abstracts each Lua script into a RedisScript object, which provides two methods: one is to set the IO stream of the Lua script, and the other is to set the Lua script directly as a string, in this case as an IO stream. The generic type of this object is the return value of the Lua script. Our script returns two longs, so we use a List to receive them.
@Bean(name = "rateLimitRedisScript")
public RedisScript<List<Long>> rateLimitRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
// redisScript.setScriptText();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/redis_rate_limit.lua")));
redisScript.setResultType(List.class);
return redisScript;
}
Copy the code
Set the Redis serialization rules
@Bean
@ConditionalOnMissingBean(StringRedisTemplate.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean(RedisTemplate.class)
public RedisTemplate<String, Object> redisTemplate( RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(jackson2JsonRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashKeySerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
Copy the code
Call the Lua script
@Resource
private RedisScript<List<Long>> rateLimitRedisScript;
@Resource
private StringRedisTemplate stringRedisTemplate;
@GetMapping
public List<Long> userToken(a) {
// Set the value of ARGV in the lua script
List<String> scriptArgs = Arrays.asList(
1 + "".3 + "",
(Instant.now().toEpochMilli()) + ""."1");
// Set the lua script KEYS value
List<String> keys = getKeys("test");
return stringRedisTemplate.execute(rateLimitRedisScript,keys, scriptArgs.toArray());
}
private List<String> getKeys(String id) {
// use `{}` around keys to use Redis Key hash tags
// this allows for using redis cluster
// Make a unique key per user.
String prefix = "request_rate_limiter.{" + id;
// You need two Redis keys for Token Bucket.
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
return Arrays.asList(tokenKey, timestampKey);
}
Copy the code
After the call is successful, two numbers are returned, one is the success flag, 0 represents the flow limit, 1 represents the flow limit, and one is the number of tokens left in the token bucket
[
1.2
]
Copy the code
Pay attention to
-
This is because the prerequisite for the successful execution of lua script is that the redis key used must be in a hash slot. After wrapping the key with braces, when redis hashes the key, Directs the characters inside the hash braces to ensure that key-values used in Lua scripts are in the same slot. This ensures the normal execution of the Redis-lua script in cluster mode. However, the contents wrapped in braces cannot be unchanged. If they are, a large number of key-values will be allocated to the same slot, resulting in hash skewling and uneven distribution of key-values.
-
So instead of using RedisTemplate, we’re using StringRedisTemplate to execute lua scripts, and when we use RedisTemplate to execute lua scripts, we’ll get an error.
AOP+RedisLua to limit the flow of the interface
If the token is successfully obtained, the traffic is not restricted and the access is normal. If the token fails to be obtained, the traffic is restricted and the access fails. In this case, a RateLimitException is thrown.
The final result
-
I plan to make a stream limiting tool combined with manual assembly of Springboot, which can be packaged into a JAR package eventually. If other projects need it, it can be directly introduced without repeated development.
-
Here’s how it works
-
Enable the traffic limiting tool by annotating @enableredisratelimit on the configuration class
-
Annotate @ratelimit on the interface that requires traffic limiting, and set traffic limiting rules based on the specific scenario
-
@RateLimit(replenishRate = 3,burstCapacity = 300)
@GetMapping("test-limit")
public Result<Void> testLimit(a){
return Result.buildSuccess();
}
Copy the code
Introduction to core code
-
@ratelimit To facilitate expansion, different scenarios are used. Here, the KeyResolver interface is implemented to specify the specific traffic limiting dimension
-
LimitProperties can be specified by default using the parameters in the annotations, but for extension purposes, limitProperties is provided. If limitProperties is specified, The limitProperties configuration will prevail.
-
The lua script described in the previous article can only limit traffic in seconds. I have made a small change to the Lua script to support traffic limiting in seconds, minutes, hours, and days.
-
Current limiting annotations
@Documented
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
/** * Stream limiting dimensions. By default, uri is used for stream limiting **@return uri
*/
Class<? extends KeyResolver> keyResolver() default UriKeyResolver.class;
/** * Traffic limiting configuration, if the interface is implemented, the default is this **@return limitProp
*/
Class<? extends LimitProperties> limitProperties() default DefaultLimitProperties.class;
/** * average token bucket fill rate per second **@return replenishRate
*/
int replenishRate(a) default 1;
/** * Total token bucket capacity **@return burstCapacity
*/
int burstCapacity(a) default 3;
/** * Traffic limiting time dimension, default is seconds * support seconds, minutes, hours, days * i.e., * {@link TimeUnit#SECONDS},
* {@link TimeUnit#MINUTES},
* {@link TimeUnit#HOURS},
* {@link TimeUnit#DAYS}
*
* @return TimeUnit
* @since1.0.2 * /
TimeUnit timeUnit(a) default TimeUnit.SECONDS;
}
Copy the code
- Current limiting configuration
limitProperties
public interface LimitProperties {
/** * average token bucket fill rate per second **@return replenishRate
*/
int replenishRate(a);
/** * Total token bucket capacity **@return burstCapacity
*/
int burstCapacity(a);
/** * Traffic limiting time dimension, default is seconds * support seconds, minutes, hours, days * i.e., * {@link TimeUnit#SECONDS},
* {@link TimeUnit#MINUTES},
* {@link TimeUnit#HOURS},
* {@link TimeUnit#DAYS}
*
* @return TimeUnit
* @since1.0.2 * /
TimeUnit timeUnit(a);
}
Copy the code
- Current limiting
lua
The script
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local time_unit = tonumber(ARGV[5])
-- The time it takes to fill the token bucket
local fill_time = capacity/rate
local ttl = math.floor((fill_time*time_unit)*2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }
Copy the code
- The core AOP
@Slf4j
@Aspect
public class RateLimitInterceptor implements ApplicationContextAware {
@Resource
private RedisTemplate<String, Object> stringRedisTemplate;
@Resource
private RedisScript<List<Long>> rateLimitRedisScript;
private ApplicationContext applicationContext;
@Around("execution(public * *(..) ) && @annotation(org.ywb.aoplimiter.anns.RateLimit)")
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
RateLimit rateLimit = method.getAnnotation(RateLimit.class);
// Assertions will not be curbed
assertNonLimit(rateLimit, pjp);
return pjp.proceed();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public void assertNonLimit(RateLimit rateLimit, ProceedingJoinPoint pjp) {
Class<? extends KeyResolver> keyResolverClazz = rateLimit.keyResolver();
KeyResolver keyResolver = applicationContext.getBean(keyResolverClazz);
String resolve = keyResolver.resolve(HttpContentHelper.getCurrentRequest(), pjp);
List<String> keys = getKeys(resolve);
LimitProperties limitProperties = getLimitProperties(rateLimit);
// Calculate the time according to the limiting time dimension
long timeLong = getCurrentTimeLong(limitProperties.timeUnit());
// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(limitProperties.replenishRate() + "",
limitProperties.burstCapacity() + "", (Instant.now().toEpochMilli() / timeLong) + ""."1", timeLong + "");
// The first argument is whether the flow is restricted, and the second argument is the number of remaining tokens
List<Long> rateLimitResponse = this.stringRedisTemplate.execute(this.rateLimitRedisScript, keys, scriptArgs.toArray());
Assert.notNull(rateLimitResponse, "redis execute redis lua limit failed.");
Long isAllowed = rateLimitResponse.get(0);
Long newTokens = rateLimitResponse.get(1);
log.info("rate limit key [{}] result: isAllowed [{}] new tokens [{}].", resolve, isAllowed, newTokens);
if (isAllowed <= 0) {
throw newRateLimitException(resolve); }}private LimitProperties getLimitProperties(RateLimit rateLimit) {
Class<? extends LimitProperties> aClass = rateLimit.limitProperties();
if (aClass == DefaultLimitProperties.class) {
// Select the configuration in the annotation
return new DefaultLimitProperties(rateLimit.replenishRate(), rateLimit.burstCapacity(), rateLimit.timeUnit());
}
// Use the user's own configuration classes first
return applicationContext.getBean(aClass);
}
private long getCurrentTimeLong(TimeUnit timeUnit) {
switch (timeUnit) {
case SECONDS:
return 1;
case MINUTES:
return 60;
case HOURS:
return 60 * 60;
case DAYS:
return 60 * 60 * 24;
default:
throw new IllegalArgumentException("timeUnit:" + timeUnit + " not support"); }}private List<String> getKeys(String id) {
// use `{}` around keys to use Redis Key hash tags
// this allows for using redis cluster
// Make a unique key per user.
String prefix = "request_rate_limiter.{" + id;
// You need two Redis keys for Token Bucket.
String tokenKey = prefix + "}.tokens";
String timestampKey = prefix + "}.timestamp";
returnArrays.asList(tokenKey, timestampKey); }}Copy the code
The core code is so much, I have uploaded the complete source code to Github,
Github.com/xiao-ren-wu…
If you feel helpful, please help to click a star, thank you ~