In the previous article, it was mentioned that the flow limiting of fixed time window could not deal with the sudden request flood peak. The token bucket line algorithm described in this paper can better deal with this scenario.
The working principle of
- The production token is put into the bucket at a certain rate and at a constant speed per unit time until the bucket capacity reaches the upper limit.
- Process the request, each time you try to get one or more tokens, process the request if you get one, and reject the request if you fail.
The advantages and disadvantages
advantages
The token of the bucket memory can be used as a traffic buffer to smooth sudden traffic.
disadvantages
The implementation is complicated.
Code implementation
core/limit/tokenlimit.go
In distributed environment, redis is considered as the storage container of buckets and tokens, and lua script is used to realize the whole algorithm process.
Redis lua script
The number of tokens generated per second is the token generation speed
local rate = tonumber(ARGV[1])
- bucket capacity
local capacity = tonumber(ARGV[2])
-- Current timestamp
local now = tonumber(ARGV[3])
-- Number of tokens currently requested
local requested = tonumber(ARGV[4])
-- How many seconds it takes to fill the bucket
local fill_time = capacity/rate
-- Round down, TTL is twice the filled time
local ttl = math.floor(fill_time*2)
- Current bucket capacity
local last_tokens = tonumber(redis.call("get", KEYS[1]))
- If the current capacity of the bucket is 0, it is the first time to enter the bucket. The default capacity is the maximum capacity of the bucket
if last_tokens == nil then
last_tokens = capacity
end
-- The time of the last refresh
local last_refreshed = tonumber(redis.call("get", KEYS[2]))
-- Set the refresh time to 0 on first entry
if last_refreshed == nil then
last_refreshed = 0
end
-- Time span since last request
local delta = math.max(0, now-last_refreshed)
-- The time span since the last request, the total number of tokens that can be produced. If the maximum capacity is exceeded, the excess tokens will be discarded
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
- Whether the number of tokens requested is sufficient
local allowed = filled_tokens >= requested
-- Number of buckets remaining
local new_tokens = filled_tokens
-- Allow this token application and calculate the remaining number
if allowed then
new_tokens = filled_tokens - requested
end
-- Set the number of remaining tokens
redis.call("setex", KEYS[1], ttl, new_tokens)
-- Set the refresh time
redis.call("setex", KEYS[2], ttl, now)
return allowed
Copy the code
Token bucket current limiter definition
type TokenLimiter struct {
// Production rate per second
rate int
/ / bucket capacity
burst int
// Storage container
store *redis.Redis
// redis key
tokenKey string
// Bucket refresh time key
timestampKey string
// lock
rescueLock sync.Mutex
// Redis Health identifier
redisAlive uint32
// Use the in-process token bucket limiter when redis fails
rescueLimiter *xrate.Limiter
// Redis monitors the id of the probe task
monitorStarted bool
}
func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
tokenKey := fmt.Sprintf(tokenFormat, key)
timestampKey := fmt.Sprintf(timestampFormat, key)
return &TokenLimiter{
rate: rate,
burst: burst,
store: store,
tokenKey: tokenKey,
timestampKey: timestampKey,
redisAlive: 1,
rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
}
}
Copy the code
The access token
func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
// Determine if Redis is healthy
// When redis fails, use an in-process current limiter
// You can save the day
if atomic.LoadUint32(&lim.redisAlive) == 0 {
return lim.rescueLimiter.AllowN(now, n)
}
// Execute the script to get the token
resp, err := lim.store.Eval(
script,
[]string{
lim.tokenKey,
lim.timestampKey,
},
[]string{
strconv.Itoa(lim.rate),
strconv.Itoa(lim.burst),
strconv.FormatInt(now.Unix(), 10),
strconv.Itoa(n),
})
// redis allowed == false
// Lua boolean false -> r Nil bulk reply
// Special handle key does not exist
if err == redis.Nil {
return false
} else iferr ! =nil {
logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
// Start the Redis health detection task
// Use an in-process current limiter as a backpocket
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
code, ok := resp.(int64)
if! ok { logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
lim.startMonitor()
return lim.rescueLimiter.AllowN(now, n)
}
// redis allowed == true
// Lua boolean true -> r integer reply with value of 1
return code == 1
}
Copy the code
Redis fail-safe strategy
The bottom of the line strategy is designed in great detail. When Redis is unavailable, start the standalone version of Ratelimit as a backup limiting to ensure that the basic limiting is available and the service is not washed out.
// Enable Redis health probe
func (lim *TokenLimiter) startMonitor(a) {
lim.rescueLock.Lock()
defer lim.rescueLock.Unlock()
// Prevent repeated opening
if lim.monitorStarted {
return
}
// Set task and health flags
lim.monitorStarted = true
atomic.StoreUint32(&lim.redisAlive, 0)
// Health detection
go lim.waitForRedis()
}
// Redis health probe scheduled task
func (lim *TokenLimiter) waitForRedis(a) {
ticker := time.NewTicker(pingInterval)
// This function is called back when the health probe succeeds
defer func(a) {
ticker.Stop()
lim.rescueLock.Lock()
lim.monitorStarted = false
lim.rescueLock.Unlock()
}()
for range ticker.C {
// ping belongs to the redis built-in health probe command
if lim.store.Ping() {
// The health detection succeeds. The health id is set
atomic.StoreUint32(&lim.redisAlive, 1)
return}}}Copy the code
The project address
Github.com/zeromicro/g…
Welcome to Go-Zero and star support us!
Wechat communication group
Pay attention to the public account of “micro-service Practice” and click on the exchange group to obtain the QR code of the community group.