The previous article (Traffic limiting algorithm and Guava RateLimiter Parsing) introduced the common traffic limiting algorithm and the implementation of RateLimiter based on the Token bucket algorithm of Google Guava. RateLimiter uses thread locks to control synchronization and is only suitable for stand-alone applications. In a distributed environment, although there are open source frameworks for stream limiting like Ali Sentinel, it is too heavy for some small applications. However, the need for stream limiting also exists in small projects, such as the control of obtaining mobile phone captch-code. Control the access frequency of resource-consuming operations. This article describes a recently written ratelimiter-based stream limiting implementation for distributed environments, distributed as spring-boot-starter, which is lightweight and “out of the box.”

The current limiting implementation in this paper includes two forms:

  1. Speed limit Control based on RateLimiter Token Bucket algorithm (Strictly limiting access speed)
  2. Lua script-based limited access control (limits the number of visits within a window of time, with no strict limit on access speed)

Speed control

1. Token bucket model

First, define a token bucket model, similar to that in RateLimiter, with several key attributes and key methods. Key attributes are defined as follows:

@Data
public class RedisPermits {

    /** * Maximum number of stored tokens */
    private double maxPermits;
    /** * The number of tokens currently stored */
    private double storedPermits;
    /** * The interval between adding tokens/milliseconds */
    private double intervalMillis;
    /** * The time at which tokens can be fetched on the next request, either past (token accumulation) or future (token pre-consumption) */
    private long nextFreeTicketMillis;

    / /...
Copy the code

The key method definition is similar to that of RateLimiter, and the method annotations basically describe the purpose of each method.

    /** * Build the Redis token data model **@paramPermitsPerSecond Number of tokens placed per second *@paramMaxBurstSeconds maxPermits Specifies the maximum storage time maxBurstSeconds token * generated by this field@paramNextFreeTicketMillis The start time for the next request to obtain the token. The default is the current system time */
    public RedisPermits(double permitsPerSecond, double maxBurstSeconds, Long nextFreeTicketMillis) {
        this.maxPermits = permitsPerSecond * maxBurstSeconds;
        this.storedPermits = maxPermits;
        this.intervalMillis = TimeUnit.SECONDS.toMillis(1) / permitsPerSecond;
        this.nextFreeTicketMillis = nextFreeTicketMillis;
    }

    /** * Based on the current time, if the current time is later than nextFreeTicketMicros, calculate how many tokens can be generated during that time, add the generated tokens to the token bucket and update the data */
    public void resync(long nowMillis) {
        if (nowMillis > nextFreeTicketMillis) {
            doublenewPermits = (nowMillis - nextFreeTicketMillis) / intervalMillis; storedPermits = Math.min(maxPermits, storedPermits + newPermits); nextFreeTicketMillis = nowMillis; }}/** * keeps the specified number of tokens and returns the time required to wait */
    public long reserveAndGetWaitLength(long nowMillis, int permits) {
        resync(nowMillis);
        double storedPermitsToSpend = Math.min(permits, storedPermits); // Number of tokens that can be consumed
        double freshPermits = permits - storedPermitsToSpend; // The number of tokens to wait for
        long waitMillis = (long) (freshPermits * intervalMillis); // The time required to wait

        nextFreeTicketMillis = LongMath.saturatedAdd(nextFreeTicketMillis, waitMillis);
        storedPermits -= storedPermitsToSpend;
        return waitMillis;
    }

    /** * Specifies whether a specified number of tokens are available */ during the timeout period
    public boolean canAcquire(long nowMillis, int permits, long timeoutMillis) {
        return queryEarliestAvailable(nowMillis, permits) <= timeoutMillis;
    }

    /** * Specifies the number of tokens available to wait **@paramPermits Number of tokens to be retained *@returnSpecifies the wait time for the number of tokens to be available. If 0 or negative, */ is currently available
    private long queryEarliestAvailable(long nowMillis, int permits) {
        resync(nowMillis);
        double storedPermitsToSpend = Math.min(permits, storedPermits); // Number of tokens that can be consumed
        double freshPermits = permits - storedPermitsToSpend; // The number of tokens to wait for
        long waitMillis = (long) (freshPermits * intervalMillis); // The time required to wait

        return LongMath.saturatedAdd(nextFreeTicketMillis - nowMillis, waitMillis);
    }
Copy the code

2. Token bucket control classes

All the controls in Guava RateLimiter are in RateLimiter and its subclasses (such as SmoothBursty). This section is concerned with synchronization in a distributed environment, so it is decoupled. The token bucket model is stored in Redis, and the control of its synchronization operation is placed in the following control class: The synchronization control uses the distributed lock introduced earlier (see Redis distributed lock based on the correct way to open)

@Slf4j
public class RedisRateLimiter {

    /** * get a token, block until get token, return block wait time **@returnTime Wait time for blocking/ms */
    public long acquire(String key) throws IllegalArgumentException {
        return acquire(key, 1);
    }

    /** * gets a specified number of tokens, and if there are not enough tokens, the block continues, and returns the time the block waits **@paramPermits Number of tokens to be obtained *@returnTime Wait time/ms *@throwsIllegalArgumentException Tokens cannot have negative or zero */
    public long acquire(String key, int permits) throws IllegalArgumentException {
        long millisToWait = reserve(key, permits);
        log.info("acquire {} permits for key[{}], waiting for {}ms", permits, key, millisToWait);
        try {
            Thread.sleep(millisToWait);
        } catch (InterruptedException e) {
            log.error("Interrupted when trying to acquire {} permits for key[{}]", permits, key, e);
        }
        return millisToWait;
    }

    /** * retrieves a token for a specified period of time, or blocks until timeout **@paramTimeout Indicates the maximum waiting time (timeout). If the value is 0, * is returned immediately without waiting@paramUnit Time unit@returnTrue if a token is obtained, false otherwise@throws IllegalArgumentException
     */
    public boolean tryAcquire(String key, long timeout, TimeUnit unit) throws IllegalArgumentException {
        return tryAcquire(key, 1, timeout, unit);
    }

    /** * Retrieves the specified number of tokens within the specified time. If the specified number of tokens cannot be retrieved within the specified time, false is returned. Otherwise, block until the specified number of tokens can be retrieved@paramPermits Number of tokens to be obtained *@paramTimeout Maximum waiting time (timeout time) *@paramUnit Time unit@returnTrue if the specified number of tokens can be obtained within the specified time, false * otherwise@throwsIllegalArgumentException Tokens are negative or zero, and throw an exception */
    public boolean tryAcquire(String key, int permits, long timeout, TimeUnit unit) throws IllegalArgumentException {
        long timeoutMillis = Math.max(unit.toMillis(timeout), 0);
        checkPermits(permits);

        long millisToWait;
        boolean locked = false;
        try {
            locked = lock.lock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId(), 60.2, TimeUnit.SECONDS);
            if (locked) {
                long nowMillis = getNowMillis();
                RedisPermits permit = getPermits(key, nowMillis);
                if(! permit.canAcquire(nowMillis, permits, timeoutMillis)) {return false;
                } else{ millisToWait = permit.reserveAndGetWaitLength(nowMillis, permits); permitsRedisTemplate.opsForValue().set(key, permit, expire, TimeUnit.SECONDS); }}else {
                return false;  // If the lock is not acquired due to timeout, false is returned}}finally {
            if(locked) { lock.unLock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId()); }}if (millisToWait > 0) {
            try {
                Thread.sleep(millisToWait);
            } catch (InterruptedException e) {

            }
        }
        return true;
    }

    /** * Reserve the specified number of tokens for use **@paramPermits Number of tokens to be retained *@returnTime The waiting time for the token to be available *@throwsIllegalArgumentException Tokens cannot be negative or zero */
    private long reserve(String key, int permits) throws IllegalArgumentException {
        checkPermits(permits);
        try {
            lock.lock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId(), 60.2, TimeUnit.SECONDS);
            long nowMillis = getNowMillis();
            RedisPermits permit = getPermits(key, nowMillis);
            long waitMillis = permit.reserveAndGetWaitLength(nowMillis, permits);
            permitsRedisTemplate.opsForValue().set(key, permit, expire, TimeUnit.SECONDS);
            return waitMillis;
        } finally{ lock.unLock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId()); }}/** * get the token bucket **@return* /
    private RedisPermits getPermits(String key, long nowMillis) {
        RedisPermits permit = permitsRedisTemplate.opsForValue().get(key);
        if (permit == null) {
            permit = new RedisPermits(permitsPerSecond, maxBurstSeconds, nowMillis);
        }
        return permit;
    }

    /** * Get redis server time */
    private long getNowMillis(a) {
        String luaScript = "return redis.call('time')";
        DefaultRedisScript<List> redisScript = new DefaultRedisScript<>(luaScript, List.class);
        List<String> now = (List<String>)stringRedisTemplate.execute(redisScript, null);
        return now == null ? System.currentTimeMillis() : Long.valueOf(now.get(0)) *1000+Long.valueOf(now.get(1)) /1000;
    }

    / /...
}
Copy the code

Among them:

  1. Acquire is a blocking method that blocks until a token is acquired if no token is available.
  2. TryAcquire is a non-blocking method that returns false without blocking wait if the specified number of tokens cannot be obtained within the specified timeout period.
  3. GetNowMillis obtains the Redis server time to avoid service server time inconsistency. If the service server can ensure time synchronization, obtain the Redis server time locally to improve efficiency.

3. Token bucket control factory class

The factory class manages the token bucket control class and caches it locally, using the Cache in Guava. On the one hand, it avoids creating a new control class every time to improve efficiency, and on the other hand, it controls the maximum capacity of the Cache to avoid excessive memory consumption like user-granularity flow limiting.

public class RedisRateLimiterFactory {

    private PermitsRedisTemplate permitsRedisTemplate;
    private StringRedisTemplate stringRedisTemplate;
    private DistributedLock distributedLock;

    private Cache<String, RedisRateLimiter> cache = CacheBuilder.newBuilder()
            .initialCapacity(100)  // Initial size
            .maximumSize(10000) // Maximum capacity of cache
            .expireAfterAccess(5, TimeUnit.MINUTES) // How long after the cache was last accessed
            .concurrencyLevel(Runtime.getRuntime().availableProcessors()) // Set the concurrency level
            .build();

    public RedisRateLimiterFactory(PermitsRedisTemplate permitsRedisTemplate, StringRedisTemplate stringRedisTemplate, DistributedLock distributedLock) {
        this.permitsRedisTemplate = permitsRedisTemplate;
        this.stringRedisTemplate = stringRedisTemplate;
        this.distributedLock = distributedLock;
    }

    /** * Create RateLimiter **@paramKey RedisRateLimiter Local cache key *@paramPermitsPerSecond Number of tokens placed per second *@paramMaxBurstSeconds Specifies the maximum number of tokens to be generated in maxBurstSeconds@paramExpire Redis tty/ SEC of the token bucket *@return RateLimiter
     */
    public RedisRateLimiter build(String key, double permitsPerSecond, double maxBurstSeconds, int expire) {
        if (cache.getIfPresent(key) == null) {
            synchronized (this) {
                if (cache.getIfPresent(key) == null) {
                    cache.put(key, newRedisRateLimiter(permitsRedisTemplate, stringRedisTemplate, distributedLock, permitsPerSecond, maxBurstSeconds, expire)); }}}returncache.getIfPresent(key); }}Copy the code

4. Annotation support

Define annotation @ratelimit as follows, indicating that the token is placed at the rate per second, the maximum number of burst seconds of token is reserved, the timeout for token taking is timeout, limitType is used to control the key type, currently supported:

  1. IP: traffic limiting based on the client IP address
  2. USER, limiting by USER. For Spring Security, the current USER information, such as userId, can be obtained from SecurityContextHolder
  3. METHOD is based on the METHOD name, classname. methodName. Do not limit the flow of methods with the same name in the same class at the same time. Otherwise, you need to modify the logic for obtaining keys
  4. #{user.id} #{user.id}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimit {
    String key(a) default "";
    String prefix(a) default "rateLimit:"; / / key prefix
    int expire(a) default 60; // RedisPermits Redis key expiration time in seconds for the token bucket model
    double rate(a) default 1.0; / / permitsPerSecond values
    double burst(a) default 1.0; / / maxBurstSeconds values
    int timeout(a) default 0; // Timeout time/second
    LimitType limitType(a) default LimitType.METHOD;
}
Copy the code

To provide flow limiting control for methods annotated with the @ratelimit annotation, the section front enhancement is shown below

@Aspect
@Slf4j
public class RedisLimitAspect {
    / /...

    @Before(value = "@annotation(rateLimit)")
    public void rateLimit(JoinPoint point, RateLimit rateLimit) throws Throwable {
        String key = getKey(point, rateLimit.limitType(), rateLimit.key(), rateLimit.prefix());
        RedisRateLimiter redisRateLimiter = redisRateLimiterFactory.build(key, rateLimit.rate(), rateLimit.burst(), rateLimit.expire());
        if(!redisRateLimiter.tryAcquire(key, rateLimit.timeout(), TimeUnit.SECONDS)){
            ExceptionUtil.rethrowClientSideException(LIMIT_MESSAGE);
        }
    }

    / /...
Copy the code

Limited control

1. Limited control class

Limiting the number of visits within a time window can be done using a counter algorithm, aided by the atomicity of Lua script execution.

Lua script logic:

  1. Take the object to be controlled as the key (such as method, user ID, or IP), the current access times as Value, and the time window as the expiration time of the cache
  2. If the key exists, increment it by 1 to check whether the current value is greater than the access limit. If the value is greater than, 0 is returned, indicating that the access limit has been reached in this time window. If the value is smaller than, 1 is returned, indicating that the access is allowed
  3. If the key does not exist, it is initialized to 1 and the expiration time is set. Returning 1 means access is allowed
public class RedisCountLimiter {

    private StringRedisTemplate stringRedisTemplate;

    private static final String LUA_SCRIPT = "local c \nc = redis.call('get',KEYS[1]) \nif c and redis.call('incr',KEYS[1]) > tonumber(ARGV[1]) then return 0 end"
            + " \nif c then return 1 else \nredis.call('set', KEYS[1], 1) \nredis.call('expire', KEYS[1], tonumber(ARGV[2])) \nreturn 1 end";

    private static final int SUCCESS_RESULT = 1;
    private static final int FAIL_RESULT = 0;

    public RedisCountLimiter(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /** * Whether to allow access to **@param key redis key
     * @paramLimit Indicates the maximum number of times *@paramExpire time range/second *@returnGet success true, otherwise false *@throws IllegalArgumentException
     */
    public boolean tryAcquire(String key, int limit, int expire) throws IllegalArgumentException {
        RedisScript<Number> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Number.class);
        Number result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(limit), String.valueOf(expire));
        if(result ! =null && result.intValue() == SUCCESS_RESULT) {
            return true;
        }
        return false; }}Copy the code

2. Annotation support

The @countLimit annotation is as follows: indicates that the maximum number of access limits can be set within the period window. LimitType controls the key type. The value is the same as @ratelimit.

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CountLimit {
    String key(a) default "";
    String prefix(a) default "countLimit:"; / / key prefix
    int limit(a) default 1;  // expire limits access times within the expire period
    int period(a) default 1; // Indicates the time range/second
    LimitType limitType(a) default LimitType.METHOD;
}
Copy the code

Pre-value enhancement is also used to provide flow limiting control for the @countLimit annotated method, as follows

@Before(value = "@annotation(countLimit)")
public void countLimit(JoinPoint point, CountLimit countLimit) throws Throwable {
    String key = getKey(point, countLimit.limitType(), countLimit.key(), countLimit.prefix());
    if (!redisCountLimiter.tryAcquire(key, countLimit.limit(), countLimit.period())) {
        ExceptionUtil.rethrowClientSideException(LIMIT_MESSAGE);
    }
}
Copy the code

Use the sample

1. Add dependencies

<dependencies>
    <dependency>
        <groupId>cn.jboost.springboot</groupId>
        <artifactId>limiter-spring-boot-starter</artifactId>
        <version>1.3 the SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
</dependencies>
Copy the code

2. Set redis parameters

spring:
  application:
    name: limiter-demo
  redis:
    # database index
    database: 0
    host: 192.16840.92.
    port: 6379
    password: password
    Connection timeout
    timeout: 2000
Copy the code

3. The test class

@RestController
@RequestMapping("limiter")
public class LimiterController {

    /** * Note form *@param key
     * @return* /
    @GetMapping("/count")
    @CountLimit(key = "#{key}", limit = 2, period = 10, limitType = LimitType.CUSTOM)
    public String testCountLimit(@RequestParam("key") String key){
        return "test count limiter...";
    }

    /** * Note form *@param key
     * @return* /
    @GetMapping("/rate")
    @ratelimit (rate = 1.0/5, burst = 5.0, expire = 120, timeout = 0)
    public String testRateLimit(@RequestParam("key") String key){
        return "test rate limiter...";
    }

    @Autowired
    private RedisRateLimiterFactory redisRateLimiterFactory;
    /** * code segment *@param
     * @return* /
    @GetMapping("/rate2")
    public String testRateLimit(a){
        RedisRateLimiter limiter = redisRateLimiterFactory.build("LimiterController.testRateLimit".1.0/30.30.120);
        if(! limiter.tryAcquire("app.limiter".0, TimeUnit.SECONDS)) {
            System.out.println(LocalDateTime.now());
            ExceptionUtil.rethrowClientSideException("Your visit is too frequent. Please try again later.");
        }
        return "test rate limiter 2..."; }}Copy the code

4. Verify

Start the test project, the browser to http://localhost:8080/limiter/rate? Key =test, the first access is successful, as shown in the figure

If you refresh continuously, the following error will be returned, and success will be returned after 5 seconds, limiting the access speed to once every 5 seconds

Use of annotations

  1. LimitType The value can be IP (client IP address), User (userId), METHOD (className.methodName), and CUSTOM. The default value is METHOD
  2. If LimitType is CUSTOM, you need to manually specify key (other keys are automatically IP, userID, or methodName). Key supports expression form, such as #{id}, #{user.id}.
  3. You can use either @countLimit or @ratelimit to limit the access to one time window. For example, the verification code can be obtained only once in a minute
// Access a mobile phone number at most once within 60 seconds
@CountLimit(key = "#{params.phone}", limit = 1, period = 60, limitType = LimitType.CUSTOM)
// Place tokens at 1/60 speed, save up to 60 tokens (i.e., save up to one token), control access speed to 1/60 tokens per second (1 token per minute)
@ratelimit (key = "#{params.phone}", rate = 1.0/60, Burst = 60, expire = 120, limitType = LimitType.custom)
Copy the code

conclusion

This paper introduces the speed limit control based on RateLimiter token bucket algorithm and the limit control based on counter algorithm, which are suitable for distributed environment, and can be applied to the scenarios with relevant requirements in small and medium-sized projects (note: this implementation has not done the pressure test, if the user concurrency is large, the effect needs to be verified).

  • For the complete code of this article, see: github.com/ronwxy/base… Spring, directory – the boot – autoconfigure/SRC/main/Java/cn/jboost springboot/autoconfig/limiter
  • Example project code address: github.com/ronwxy/spri…

If you find it helpful, don’t forget to give star ^_^. Author public number: half rain song, welcome to pay attention to view more dry goods article.