Hello, I’m from Go School and we’re going to talk about how LeakyBucket works

Leaky bucket, also known as a leaky bucket, is where requests are queued up and processed at a fixed rate.

A bucket has a certain capacity, that is, the maximum number of requests. If the number of queued requests exceeds the bucket’s capacity, incoming requests are filtered out and no longer processed. In other words, requests are queued in the bucket, and the system or service takes them out of the bucket at a constant rate for processing. If the number of queued requests exceeds the maximum number that the bucket can hold, that is, the bucket is full, the bucket is discarded.

There are many kinds of algorithm implementation, this paper is based on the implementation of counting principle to explain. The counting principle is essentially a calculation of how long a request should wait to be processed based on the current time and scheduled processing rate when it arrives.

The main attribute fields of the algorithm are as follows:

  • Rate of processing requests. This value represents how often a request is processed. This is essentially how long it takes to process a request before the next one can be processed. For example, when we initialize the service to process one request every 100ms, that is, for every one request, we need to wait 100ms for the next request.
  • Maximum capacity of a bucket. This value indicates the maximum number of queued requests we can allow. If the queued request exceeds this value, the queued request is returned without waiting. There are many similar scenes in life: once we went to the park and stood in line for a boat ride. The line was very long. The supervisor came and told us that only the first 20 people could be placed in the queue, and the ones after the 20th could be spared.
  • The last queued request in the bucket was processed last. This value has two functions:
    • The first function is to calculate how long it takes for a new request to be processed when a new request comes in: last+rate
    • The second function is to calculate how many requests are currently waiting to be processed based on last, current time t, and rate rate:
waitRequests = (last - t) / rate
Copy the code

Based on the key property fields above, we can define LeakBucket as follows:

type LeakyBucket struct {
	rate int64 // The rate at which requests are processed
	capacity int64 // The maximum capacity of a bucket
  last time.Time // The time when the last queued request in the bucket was processed
}
Copy the code

Then, LeakyBucket also has a Limit function:

func (t *LeakyBucket) Limit(a) (time.Duration, error){}Copy the code

The main purpose of this function is to calculate the wait time for incoming requests to be processed. There are two explanations for this function:

  • This function needs to be called for each request received, and each call equals one request flowing into the bucket.
  • The return value represents how long the caller has to wait to process the request.Sleep is the amount of time the caller has to wait to process the request.Sleep controls the consumption rate.

Therefore, we can see the flow of requests in and out as follows:

How are requests queued?

Assume that the LeakyBucket is now empty, leaking at the rate of 100ms per request, and has a capacity of 5. Now that we have five requests flowing into the bucket at the same time, let’s look at how each request calculates its respective estimated processing time and wait time through the Limit.

  • The first request comes in and is processed without waiting. The processing time is last= current time t
  • The second request, because the first request has just been queued and processed, needs to wait at the processing rate, then the processing time is the first request processing time +rate, i.e. Last =t+100ms
  • The third request, since the second request is still queued, should wait 100ms for the second request to be processed, i.e. Last = the second processing time +100ms=t+300ms
  • The fourth and fifth requests are followed by analogy, as shown below:

It’s a little more intuitive to translate the above image into a timeline:

In the above case, we assume that in the empty bucket state, n requests are poured in at the same time. From the second request, the processing time of the previous request is +rate.

But if the new request flows in 50ms after the last request flows in, that is, the last+50ms time point. If the request is processed in the same way as the request was processed in the same way as the request was processed in the same way as the request was processed in the same way:

(last+rate) - (last+50Ms) = rate +50ms
Copy the code

In this way, it is 50ms more than the rate. So how do you calculate that? The actual processing time should first calculate the interval between the current time and the last processing time, and then compare with rate to see how much the difference is compared with rate, and then complete the difference, namely:

Delta = current time t-last = current time t + (rate-delta)Copy the code

As shown below:

How to calculate if the bucket is full?

When new requests come in and the bucket is full, they are no longer queued and simply discarded. So how do you figure out if the bucket is full? Capacity (capacity) : capacity (capacity) : capacity (capacity) : capacity (capacity) : capacity (capacity) : capacity (capacity) : capacity (capacity) : capacity (capacity)

Wait = (last-now)/rateif wait > capacity {
  // The bucket is full
}else {
 	// If not, you can continue to queue
}
Copy the code

In this case, the capacity of the bucket actually represents the maximum waiting time for the business to accept the request to be processed. For example, a Web user is willing to wait at most 10 seconds to access a page. If the request waits for processing for 11 seconds, even if it is processed, the user has already lost and the meaning of waiting is lost.

Ok, a few key points are introduced, below we directly paste part of the implementation of the code, the complete code can be refer to github.com/uber-go/rat… And github.com/mennanov/li…

type LeakyBucket struct {
	rate int64 // The rate at which requests are processed
	capacity int64 // The maximum capacity of a bucket
  last time.Time // The time when the last queued request in the bucket was processed
  mu sync.Mutex
}

func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error) {
	// Lock this to ensure that each request is processed sequentially
  t.mu.Lock()
	defer t.mu.Unlock()

  now := time.Now().UnixNano() // The number of nanoseconds of the current time
	if now < t.last {
		// If a new request is queued, it will be processed after rate
		t.last += t.rate
	} else {
		// The bucket is empty, perhaps in its initial state, or perhaps all requests have been processed.
		
    var offset int64 // represents how long to wait for the request to be processed
		delta := now - state.Last // Indicates how long it has been since the request was last processed
		if delta < t.rate {
      // The time for processing the request next time has not arrived. Therefore, you need to wait for the offset
			offset = t.rate - delta
		}
    // If delta > t. ate indicates that the time since the last request was processed has exceeded the rate, offset is 0, and the new request should be processed immediately
		t.last = now + offset // Update the time when the request should be processed
	}

	wait := t.last - now // Calculate whether the bucket is full
	if wait/t.rate > t.capacity {
    If the bucket is full, an error is returned, and the caller can discard the bucket directly or wait as long as necessary. It is usually discarded directly.
   
    t.last = now - offset // Since the request is discarded, the state of the new request is left as it was before the new request was queued
		return time.Duration(wait), ErrLimitExhausted
	}
  
  // If the queue is queued successfully, return the waiting time to the caller, and let the caller sleep block to process the request at the rate
	return time.Duration(wait), nil
}

Copy the code

conclusion

The core idea of LeakyBucket is to process requests at a fixed rate and therefore does not support a surge in traffic. Because no matter how much traffic there is, it is processed at a fixed rate. The implementation of the algorithm essentially calculates how long the request can be processed and how long it needs to wait at a fixed processing rate.