“This is my 38th day of participating in the First Challenge 2022. For details: First Challenge 2022”

Juju/Ratelimit source code analysis

Token buckets, aside from testing, are implemented using the standard library.

A brief introduction to token buckets

Token bucket is a kind of traffic limiting algorithm, which not only has the function of traffic limiting, but also has certain elasticity to sudden increase of traffic.

Introduction to current limiting

In the age of the Internet of everything, many services depend on and are dependent on many external services, and when dependent on external services, the current service is the service provider.

Consumers are defined as service consumers and providers as service providers.

In the era of the rise of distributed/micro-services, the high availability of services has been emphasized again and again. Regardless of the robustness of the program, the problem of sudden traffic surge has been emphasized again and again as a high availability problem in recent years. Among them, the sudden increase of traffic and abnormal traffic have been studied.

  • Current limiting: Acts as a provider to limit traffic that exceeds its load capacity.
  • fusing: As a consumer, perform circuit interruption protection for abnormal traffic.

This time only introduces current limiting, the next article introduces fusing.

Common algorithms for limiting traffic

The algorithms commonly used for limiting flow are counters, sliding Windows, leaky buckets, and the main character of this time: token buckets.

Each algorithm has its own advantages and disadvantages. Compared with other algorithms, token bucket has the advantage of being flexible in the face of sudden increase of traffic. When the tokens in the bucket are exhausted, it will degenerate into a leaky bucket.

How token buckets are implemented

Token bucket to store the token, when there is a request, are taken from the barrel of a token, when there is no token bucket, do current limit with the current request, specific treatment according to different needs and different, the most common way is the denial of service, which is the most effective, most violent way which in some core scenarios (e.g., payment) will not be able to direct to refuse.

There is a fixed rate and number of tokens added to the bucket. In the long run, the rate at which tokens are added is the maximum load, and in normal times, the remaining tokens in the bucket are the surge of traffic that can be handled.

From above, the core of a token bucket is to count the number of tokens left in the bucket. The factors affecting the number of remaining tokens are as follows:

  1. Bucket capacity. When the token consumption rate is less than the token production rate, the capacity is the number of remaining tokens
  2. The rate of token production is affected by the interval and number of tokens added

Source code analysis

Structure analysis

For 3 files,ratelimit.go implements the token bucket; Reader.go encapsulates the token bucket, and the last one is the test.

Analysis of token bucket implementations

Any library that involves time lapse should, for testing convenience, add interfaces that enable piling to facilitate testing

type Clock interface {
  Now() time.Time
  Sleep(d time.Duration)
}

// realClock represents a realClock, not exposed
type realClock struct{}
func (realClock) Now(a) time.Time {
  return time.Now()
}
func (realClock) Sleep(d time.Duration) {
  time.Sleep(d)
}
Copy the code

Add a token at a fixed time interval, and don’t care when the token is fetched, so the clock ticks at a fixed time interval

All the time involved will be converted to TICK for convenient calculation.

func (tb *Bucket) currentTick(now time.Time) int64 {
  return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
Copy the code

The bucket stores the initial time and the last updated tick

type Bucket struct {
  clock Clock
  startTime time.Time
  capacity int64
  quantum int64
  fillInterval time.Duration
  mu sync.Mutex
  availableTokens int64
  latestTick int64
}
Copy the code

LatestTick is updated each time the interface exposed by the token bucket is called

// Update the number of remaining tokens, which is the only place where the token bucket needs to be counted
CurrentTick (now()): indicates the tick corresponding to the current time
func (tb *Bucket) adjustavailableTokens(tick int64) {
  lastTick := tb.latestTick
  tb.latestTick = tick
  // If the bucket is full, there is no need to update the token remaining number
  if tb.availableTokens >= tb.capacity {
    return
  }
  // Update the number of remaining tokens, if overflow, set to bucket capacity
  tb.availableTokens += (tick - lastTick) * tb.quantum
  if tb.availableTokens > tb.capacity {
    tb.availableTokens = tb.capacity
  }
  return
}
Copy the code

Any problem, as long as it is converted into a mathematical problem, the performance is greatly improved. Instead of updating the bucket at every interval, we update it every time the interface is called, multiplying the tick difference by quantum to get the token increment.

The core of token buckets, once implemented, is to enrich the functions provided externally

/ / bucket capacity
func (tb *Bucket) Capacity(a) int64 {
  return tb.capacity
}
// The number of tokens added per second can be mapped to QPS in the business
func (tb *Bucket) Rate(a) float64 {
  return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}
Copy the code

1 e9 / time. Duration, derived from:

type Duration int64
const (
  Nanosecond  Duration = 1
  Microsecond          = 1000 * Nanosecond
  Millisecond          = 1000 * Microsecond
  Second               = 1000 * Millisecond
  Minute               = 60 * Second
  Hour                 = 60 * Minute
)
Copy the code

As you can see above,1 second is 1e9, and Duration is a new type definition based on INT64.

Query the number of remaining tokens:

func (tb *Bucket) Available(a) int64 {
  return tb.available(tb.clock.Now())
}
func (tb *Bucket) available(now time.Time) int64 {
  tb.mu.Lock()
  defer tb.mu.Unlock()
  // currentTick is the tick that calculates the current time
  AdjustavailableTokens (adjustavailableTokens
  tb.adjustavailableTokens(tb.currentTick(now))
  return tb.availableTokens
}
Copy the code

Consume a specified number of tokens or, if there are not enough, only all remaining tokens

func (tb *Bucket) TakeAvailable(count int64) int64 {
  tb.mu.Lock()
  defer tb.mu.Unlock()
  return tb.takeAvailable(tb.clock.Now(), count)
}
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
  if count <= 0 {
    return 0
  }
  tb.adjustavailableTokens(tb.currentTick(now))
  if tb.availableTokens <= 0 {
    return 0
  }
  if count > tb.availableTokens {
    count = tb.availableTokens
  }
  tb.availableTokens -= count
  return count
}
Copy the code

There is another way to consume tokens: consume a specified number of tokens, plus a waiting period. If the specified number of tokens cannot be obtained within the waiting period, failure is returned directly, and tokens are not consumed for further business logic processing. If the specified number of tokens can be obtained within the waiting time, the remaining number of tokens will be brushed to negative.

func (tb *Bucket) TakeMaxDuration(count int64, 
  maxWait time.Duration) (time.Duration, bool) {

  tb.mu.Lock()
  defer tb.mu.Unlock()
  return tb.take(tb.clock.Now(), count, maxWait)
}
func (tb *Bucket) take(now time.Time, count int64,
  maxWait time.Duration) (time.Duration, bool) {

  // Check parameters and remaining token count
  if count <= 0 {
    return 0.true
  }
  tick := tb.currentTick(now)
  tb.adjustavailableTokens(tick)
  avail := tb.availableTokens - count
  if avail >= 0 {
    // If there are enough tokens left, they are consumed
    tb.availableTokens = avail
    return 0.true
  }

  // If the number of tokens is insufficient, it takes more time to calculate the number of tokens
  endTick := tick + (-avail+tb.quantum- 1)/tb.quantum
  endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
  waitTime := endTime.Sub(now)
  if waitTime > maxWait {
    // If the maximum wait time is exceeded, return failure
    return 0.false
  }
  // During the waiting period, you can get enough tokens
  // The number of remaining tokens is negative, which consumes the number of future tokens and must be used in a specific scenario
  tb.availableTokens = avail
  return waitTime, true
}
Copy the code

A more commonly used function, Take, is also provided with the package, which has a maximum wait time of a very large value and must consume future tokens if there are insufficient tokens:

const infinityDuration time.Duration = 0x7fffffffffffffff
Copy the code

Take is a special case of TakeMaxDuration.

The last function, based on TakeMaxDuration, blocks to consume the tick if the number of future tokens is consumed.

func (tb *Bucket) Wait(count int64) {
  if d := tb.Take(count); d > 0 {
    tb.clock.Sleep(d)
  }
}
Copy the code

The rest of the code is different forms of constructors that I won’t go into.