In the previous article, it was mentioned that the flow limiting of fixed time window could not deal with the sudden request flood peak. The token bucket line algorithm described in this paper can better deal with this scenario.

The working principle of

  1. The production token is put into the bucket at a certain rate and at a constant speed per unit time until the bucket capacity reaches the upper limit.
  2. Process the request, each time you try to get one or more tokens, process the request if you get one, and reject the request if you fail.

The advantages and disadvantages

advantages

The token of the bucket memory can be used as a traffic buffer to smooth sudden traffic.

disadvantages

The implementation is complicated.

Code implementation

core/limit/tokenlimit.go

In distributed environment, redis is considered as the storage container of buckets and tokens, and lua script is used to realize the whole algorithm process.

Redis lua script

The number of tokens generated per second is the token generation speed
local rate = tonumber(ARGV[1])
- bucket capacity
local capacity = tonumber(ARGV[2])
-- Current timestamp
local now = tonumber(ARGV[3])
-- Number of tokens currently requested
local requested = tonumber(ARGV[4])
-- How many seconds it takes to fill the bucket
local fill_time = capacity/rate
-- Round down, TTL is twice the filled time
local ttl = math.floor(fill_time*2)
- Current bucket capacity
local last_tokens = tonumber(redis.call("get", KEYS[1]))
- If the current capacity of the bucket is 0, it is the first time to enter the bucket. The default capacity is the maximum capacity of the bucket
if last_tokens == nil then
last_tokens = capacity
end
-- The time of the last refresh
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- Set the refresh time to 0 on first entry
if last_refreshed == nil then
last_refreshed = 0
end
-- Time span since last request
local delta = math.max(0, now-last_refreshed)
-- The time span since the last request, the total number of tokens that can be produced. If the maximum capacity is exceeded, the excess tokens will be discarded
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
- Whether the number of tokens requested is sufficient
local allowed = filled_tokens >= requested
-- Number of buckets remaining
local new_tokens = filled_tokens
-- Allow this token application and calculate the remaining number
if allowed then
new_tokens = filled_tokens - requested
end
-- Set the number of remaining tokens
redis.call("setex", KEYS[1], ttl, new_tokens)
-- Set the refresh time
redis.call("setex", KEYS[2], ttl, now)

return allowed
Copy the code

Token bucket current limiter definition

type TokenLimiter struct {
    // Production rate per second
    rate int
    / / bucket capacity
    burst int
    // Storage container
    store *redis.Redis
    // redis key
    tokenKey       string
    // Bucket refresh time key
    timestampKey   string
    // lock
    rescueLock     sync.Mutex
    // Redis Health identifier
    redisAlive     uint32
    // Use the in-process token bucket limiter when redis fails
    rescueLimiter  *xrate.Limiter
    // Redis monitors the id of the probe task
    monitorStarted bool
}

func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
    tokenKey := fmt.Sprintf(tokenFormat, key)
    timestampKey := fmt.Sprintf(timestampFormat, key)

    return &TokenLimiter{
        rate:          rate,
        burst:         burst,
        store:         store,
        tokenKey:      tokenKey,
        timestampKey:  timestampKey,
        redisAlive:    1,
        rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
    }
}
Copy the code

The access token

func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
    // Determine if Redis is healthy
    // When redis fails, use an in-process current limiter
    // You can save the day
    if atomic.LoadUint32(&lim.redisAlive) == 0 {
        return lim.rescueLimiter.AllowN(now, n)
    }
    // Execute the script to get the token
    resp, err := lim.store.Eval(
        script,
        []string{
            lim.tokenKey,
            lim.timestampKey,
        },
        []string{
            strconv.Itoa(lim.rate),
            strconv.Itoa(lim.burst),
            strconv.FormatInt(now.Unix(), 10),
            strconv.Itoa(n),
        })
    // redis allowed == false
    // Lua boolean false -> r Nil bulk reply
    // Special handle key does not exist
    if err == redis.Nil {
        return false
    } else iferr ! =nil {
        logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
        // Start the Redis health detection task
        // Use an in-process current limiter as a backpocket
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    code, ok := resp.(int64)
    if! ok { logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
        lim.startMonitor()
        return lim.rescueLimiter.AllowN(now, n)
    }

    // redis allowed == true
    // Lua boolean true -> r integer reply with value of 1
    return code == 1
}
Copy the code

Redis fail-safe strategy

The bottom of the line strategy is designed in great detail. When Redis is unavailable, start the standalone version of Ratelimit as a backup limiting to ensure that the basic limiting is available and the service is not washed out.

// Enable Redis health probe
func (lim *TokenLimiter) startMonitor(a) {
    lim.rescueLock.Lock()
    defer lim.rescueLock.Unlock()
    // Prevent repeated opening
    if lim.monitorStarted {
        return
    }

    // Set task and health flags
    lim.monitorStarted = true
    atomic.StoreUint32(&lim.redisAlive, 0)
    // Health detection
    go lim.waitForRedis()
}

// Redis health probe scheduled task
func (lim *TokenLimiter) waitForRedis(a) {
    ticker := time.NewTicker(pingInterval)
    // This function is called back when the health probe succeeds
    defer func(a) {
        ticker.Stop()
        lim.rescueLock.Lock()
        lim.monitorStarted = false
        lim.rescueLock.Unlock()
    }()

    for range ticker.C {
        // ping belongs to the redis built-in health probe command
        if lim.store.Ping() {
            // The health detection succeeds. The health id is set
            atomic.StoreUint32(&lim.redisAlive, 1)
            return}}}Copy the code

The project address

Github.com/zeromicro/g…

Welcome to Go-Zero and star support us!

Wechat communication group

Pay attention to the public account of “micro-service Practice” and click on the exchange group to obtain the QR code of the community group.