“This is my 38th day of participating in the First Challenge 2022. For details: First Challenge 2022”
Juju/Ratelimit source code analysis
Token buckets, aside from testing, are implemented using the standard library.
A brief introduction to token buckets
Token bucket is a kind of traffic limiting algorithm, which not only has the function of traffic limiting, but also has certain elasticity to sudden increase of traffic.
Introduction to current limiting
In the age of the Internet of everything, many services depend on and are dependent on many external services, and when dependent on external services, the current service is the service provider.
Consumers are defined as service consumers and providers as service providers.
In the era of the rise of distributed/micro-services, the high availability of services has been emphasized again and again. Regardless of the robustness of the program, the problem of sudden traffic surge has been emphasized again and again as a high availability problem in recent years. Among them, the sudden increase of traffic and abnormal traffic have been studied.
Current limiting
: Acts as a provider to limit traffic that exceeds its load capacity.fusing
: As a consumer, perform circuit interruption protection for abnormal traffic.
This time only introduces current limiting, the next article introduces fusing.
Common algorithms for limiting traffic
The algorithms commonly used for limiting flow are counters, sliding Windows, leaky buckets, and the main character of this time: token buckets.
Each algorithm has its own advantages and disadvantages. Compared with other algorithms, token bucket has the advantage of being flexible in the face of sudden increase of traffic. When the tokens in the bucket are exhausted, it will degenerate into a leaky bucket.
How token buckets are implemented
Token bucket to store the token, when there is a request, are taken from the barrel of a token, when there is no token bucket, do current limit with the current request, specific treatment according to different needs and different, the most common way is the denial of service, which is the most effective, most violent way which in some core scenarios (e.g., payment) will not be able to direct to refuse.
There is a fixed rate and number of tokens added to the bucket. In the long run, the rate at which tokens are added is the maximum load, and in normal times, the remaining tokens in the bucket are the surge of traffic that can be handled.
From above, the core of a token bucket is to count the number of tokens left in the bucket. The factors affecting the number of remaining tokens are as follows:
- Bucket capacity. When the token consumption rate is less than the token production rate, the capacity is the number of remaining tokens
- The rate of token production is affected by the interval and number of tokens added
Source code analysis
Structure analysis
For 3 files,ratelimit.go implements the token bucket; Reader.go encapsulates the token bucket, and the last one is the test.
Analysis of token bucket implementations
Any library that involves time lapse should, for testing convenience, add interfaces that enable piling to facilitate testing
type Clock interface {
Now() time.Time
Sleep(d time.Duration)
}
// realClock represents a realClock, not exposed
type realClock struct{}
func (realClock) Now(a) time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
Copy the code
Add a token at a fixed time interval, and don’t care when the token is fetched, so the clock ticks at a fixed time interval
All the time involved will be converted to TICK for convenient calculation.
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
Copy the code
The bucket stores the initial time and the last updated tick
type Bucket struct {
clock Clock
startTime time.Time
capacity int64
quantum int64
fillInterval time.Duration
mu sync.Mutex
availableTokens int64
latestTick int64
}
Copy the code
LatestTick is updated each time the interface exposed by the token bucket is called
// Update the number of remaining tokens, which is the only place where the token bucket needs to be counted
CurrentTick (now()): indicates the tick corresponding to the current time
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
// If the bucket is full, there is no need to update the token remaining number
if tb.availableTokens >= tb.capacity {
return
}
// Update the number of remaining tokens, if overflow, set to bucket capacity
tb.availableTokens += (tick - lastTick) * tb.quantum
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}
Copy the code
Any problem, as long as it is converted into a mathematical problem, the performance is greatly improved. Instead of updating the bucket at every interval, we update it every time the interface is called, multiplying the tick difference by quantum to get the token increment.
The core of token buckets, once implemented, is to enrich the functions provided externally
/ / bucket capacity
func (tb *Bucket) Capacity(a) int64 {
return tb.capacity
}
// The number of tokens added per second can be mapped to QPS in the business
func (tb *Bucket) Rate(a) float64 {
return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}
Copy the code
1 e9 / time. Duration, derived from:
type Duration int64
const (
Nanosecond Duration = 1
Microsecond = 1000 * Nanosecond
Millisecond = 1000 * Microsecond
Second = 1000 * Millisecond
Minute = 60 * Second
Hour = 60 * Minute
)
Copy the code
As you can see above,1 second is 1e9, and Duration is a new type definition based on INT64.
Query the number of remaining tokens:
func (tb *Bucket) Available(a) int64 {
return tb.available(tb.clock.Now())
}
func (tb *Bucket) available(now time.Time) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
// currentTick is the tick that calculates the current time
AdjustavailableTokens (adjustavailableTokens
tb.adjustavailableTokens(tb.currentTick(now))
return tb.availableTokens
}
Copy the code
Consume a specified number of tokens or, if there are not enough, only all remaining tokens
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
return 0
}
if count > tb.availableTokens {
count = tb.availableTokens
}
tb.availableTokens -= count
return count
}
Copy the code
There is another way to consume tokens: consume a specified number of tokens, plus a waiting period. If the specified number of tokens cannot be obtained within the waiting period, failure is returned directly, and tokens are not consumed for further business logic processing. If the specified number of tokens can be obtained within the waiting time, the remaining number of tokens will be brushed to negative.
func (tb *Bucket) TakeMaxDuration(count int64,
maxWait time.Duration) (time.Duration, bool) {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.take(tb.clock.Now(), count, maxWait)
}
func (tb *Bucket) take(now time.Time, count int64,
maxWait time.Duration) (time.Duration, bool) {
// Check parameters and remaining token count
if count <= 0 {
return 0.true
}
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
avail := tb.availableTokens - count
if avail >= 0 {
// If there are enough tokens left, they are consumed
tb.availableTokens = avail
return 0.true
}
// If the number of tokens is insufficient, it takes more time to calculate the number of tokens
endTick := tick + (-avail+tb.quantum- 1)/tb.quantum
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
// If the maximum wait time is exceeded, return failure
return 0.false
}
// During the waiting period, you can get enough tokens
// The number of remaining tokens is negative, which consumes the number of future tokens and must be used in a specific scenario
tb.availableTokens = avail
return waitTime, true
}
Copy the code
A more commonly used function, Take, is also provided with the package, which has a maximum wait time of a very large value and must consume future tokens if there are insufficient tokens:
const infinityDuration time.Duration = 0x7fffffffffffffff
Copy the code
Take is a special case of TakeMaxDuration.
The last function, based on TakeMaxDuration, blocks to consume the tick if the number of future tokens is consumed.
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.clock.Sleep(d)
}
}
Copy the code
The rest of the code is different forms of constructors that I won’t go into.