In distributed system, when dealing with high concurrent access, cache, traffic limiting and degradation are common methods to protect the normal operation of the system. When a sudden surge in requests occurs, if access is not restricted, the entire system may crash and the service may become unavailable.
There are also business scenarios, such as SMS captcha, or other third-party API calls, that need to provide the necessary access restriction support. There are also requests that consume too much resources, such as data exports, that require limited access frequency.
Common traffic limiting algorithms are
-
Token bucket algorithm,
-
Bucket algorithm
-
Counter algorithm.
This article mainly introduces the basic principles of the three algorithms and the implementation of the token bucket algorithm in Google Guava package RateLimiter. The next article introduces a recently written distributed traffic limiting implementation and counter traffic limiting implementation based on RateLimiter.
Token bucket algorithm
The principle of token bucket algorithm is to put a token into the bucket at a constant speed, each request processing needs to obtain a token from the bucket first, when there is no token in the bucket, the request will not be processed, either queuing, demoting processing, or outright denial of service. When the bucket is full of tokens, newly added tokens are discarded or rejected.
The processing diagram of token bucket algorithm is as follows (picture from network)
The token bucket algorithm is mainly able to control the average processing rate of requests. It allows pre-consumption, that is, tokens can be consumed in advance to deal with unexpected requests, but later requests need to pay for pre-consumption (wait longer) to meet the average processing rate of requests is certain.
Bucket algorithm
The principle of the leaky bucket algorithm is that the water (request) enters the leaky bucket first, and the leaky bucket is discharged at a certain speed (processing request). When the inflow speed of water is higher than the outflow speed, the water gradually accumulates in the bucket until the bucket is full, and the water will overflow (request is rejected).
The processing diagram of leaky bucket algorithm is as follows (picture from network)
Leaky bucket algorithm mainly controls the processing rate of requests and smoothen the burst traffic on the network. Requests can enter the leaky bucket at any speed, but the request is processed at a constant speed.
Counter algorithm
The counter algorithm is the simplest one in the flow limiting algorithm, which limits how many requests can be processed at most in a time window. For example, a maximum of 10 requests can be processed per minute. The maximum number of requests can be processed in a 60s time window starting from the time of the first request. The next time window starts at the time when the first request comes in after the previous time window. Common functions such as obtaining SMS verification codes only once per minute can be implemented by counter algorithms.
Guava RateLimiter parsing
Guava is an open source toolkit from Google. RateLimiter is a stream limiting utility class that implements the token bucket algorithm. RateLimiter can be used by adding the Guava dependency to pom.xml
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0 the jre</version>
</dependency>
Copy the code
The following test code example uses RateLimiter,
public static void main(String[] args) { RateLimiter rateLimiter = RateLimiter.create(1); // create a token bucket that generates one token per second i<=5; i++) { double waitTime = rateLimiter.acquire(i); System.out.println("acquire:" + I + "waitTime:" + waitTime); }}Copy the code
After running, the output is as follows,
Acquire :1 waitTime:0.0 acquire:2 waitTime:0.997729 acquire:3 waitTime:1.998076 acquire:4 waitTime:3.000303 acquire:5 WaitTime: 4.000223Copy the code
Get a token for the first time, waiting for the 0 s immediate access to (there is no need to wait for because the token bucket of consumption characteristics), the second to get two tokens, waiting time for 1 s, the 1 s is because the front for a token advance consumption without waiting to the waiting time, this time for two and consumption, So the next fetch (at three) will have to wait for the 2s required for this pre-consumption, and so on. It can be seen that the time that pre-consumption does not need to wait is paid by the next time, so as to ensure a certain average processing speed (for example, 1s).
RateLimiter has two implementations:
-
SmoothBursty: Token generation rate is constant. A SmoothBursty instance is created using RateLimiter. Create (Double permitsPerSecond).
-
SmoothWarmingUp: Token generation speeds up until a stable value is reached. WarmingUp, as the name suggests, has a warm-up process. Create (Double permitsPerSecond, Long warmupPeriod, TimeUnit Unit) is a SmoothWarmingUp instance, A warmupPeriod is the time during which you warmup to a steady speed.
The class structure is as follows
Key attributes and method resolution (SmoothBursty as an example)
Source code analysis:
Method of use
RateLimiter rateLimiter = RateLimiter.create(QPS);
if(rateLimiter.tryAcquire()){
System.out.println("Handle request");
}else{
System.out.println("Reject request");
}
Copy the code
Core fields:
-
PermitsPerSecond: the number of tokens generated per second;
-
MaxBurstSeconds: /** The maximum number of seconds to store tokens in the bucket */
-
MaxPermits: The maximum number of tokens equals to permitsPerSecond*maxBurstSeconds
-
StableIntervalMicros: The number of microseconds required to generate a token
-
StoredPermits: number of tokens currently stored;
-
**nextFreeTicketMicros: core parameter indicating the next point in time when tokens can be assigned; ** can be a point in time in the past or in the future
Core idea – pre-distribution
-
Sleep to nextFreeTicketMicros
-
TryAcquire can be assigned a token at this time regardless of current storedPermits
-
Delay nextFreeTicketMicros to nextFreeTicketMicros+(permit-storedpermits)*stableIntervalMicros; In this way, even if the current storedperps are not allocated enough, the effect of flow limiting can be achieved by pushing back the time point of next allocation and no token will be allocated before this time point.
This is an idea, mostly to be understood, but some boundary or WarmUp implementation details are omitted.
To be clear, it’s not our traditional way of thinking, you accumulate N tokens and then you distribute them. Instead, it allocates first and then moves the next allocation later.
For example:
Let’s say I can allocate 10 tokens per second, and the current time is 2s, so I’ve stored 20 tokens, and now I want to allocate 30 tokens, one way is to wait 1 second, save 30 tokens at 3 seconds, and then allocate back to allocate tokens; Guava, on the other hand, simply goes back and sets the next token allocation point at 3 seconds.
Both cases fulfill the QPS requirement of 30 tokens allocated in 3 seconds
Core method
create(double permitsPerSecond)
The create(double permitsPerSecond) method creates a SmoothBursty instance with maxBurstSeconds set to 1s by default. SleepingStopwatch is an implementation of the clock class in Guava.
It is initialized by calling SmoothBursty. DoSetRate (double, long)
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
Copy the code
- Node does not need to implement the corresponding create instance, just a new class.
class GuavaRateLimiter {
constructor(permitsPerSecond) {
this.permitsPerSecond = permitsPerSecond // The number of tokens generated per second
this.maxBurstSeconds = 1 // The number of tokens that can be held in a bucket for up to 1 second
this.rateLimiter = new SmoothBursty(new SleepingStopwatch(), this.maxBurstSeconds)
this.rateLimiter.setRate(permitsPerSecond)
}
setRate(permitsPerSecond) {
this.rateLimiter.setRate(permitsPerSecond)
Logger.info('update rate : ' + permitsPerSecond)
}
}
Copy the code
doSetRate(double, long)
In doSetRate method:
-
Resync (nowMicros) adjusts storedPermits and nextFreeTicketMicros — if the current time is later than nextFreeTicketMicros, the number of tokens generated during this time is counted. Accumulate to storedPermits, and update the next available token time nextFreeTicketMicros to the current time.
-
Calculate the value of stableIntervalMicros, 1/permitsPerSecond.
-
Call the doSetRate(double, double) method to calculate maxBurstSeconds*permitsPerSecond, and adjust storedPermits based on the old maxPermits value.
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
doublenewPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; }}@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// What scenario is this big? Common applications are not available
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)?0.0 // initial state: storedPermits * maxPermits / oldMaxPermits; }}Copy the code
Implement the following in Node:
/** * constant rate token limiter */
class SmoothRateLimiter {
constructor(stopwatch) {
this.stopwatch = stopwatch
this.storedPermits = 0 // double
this.maxPermits = 0 // double
this.stableIntervalMicros = 0 // double
this.nextFreeTicketMicros = 0 // long
}
setRate(permitsPerSecond) {
let nowMicros = this.stopwatch.readMicros()
this.resync(nowMicros)
this.stableIntervalMicros = 1000000 / permitsPerSecond // double
this.doSetRate(permitsPerSecond, this.stableIntervalMicros)
}
queryEarliestAvailable(nowMicros) {
debug(this.nextFreeTicketMicros + ':' + nowMicros)
return this.nextFreeTicketMicros
}
reserveEarliestAvailable(requiredPermits, nowMicros) {
this.resync(nowMicros)
let returnValue = this.nextFreeTicketMicros
let storedPermitsToSpend = min(requiredPermits, this.storedPermits)
let freshPermits = requiredPermits - storedPermitsToSpend
debug('freshPermits : ' + freshPermits)
let waitMicros = freshPermits * this.stableIntervalMicros
debug('waitMicros : ' + waitMicros)
debug('nextFreeTicketMicros : ' + this.nextFreeTicketMicros)
this.nextFreeTicketMicros += waitMicros
this.storedPermits -= storedPermitsToSpend
debug('storedPermits : ' + this.storedPermits)
debug('maxPermits : ' + this.maxPermits)
return returnValue
}
/** * delay calculation * @param {*} nowMicros */
resync(nowMicros) {
if (nowMicros > this.nextFreeTicketMicros) {
this.storedPermits = min(this.maxPermits, this.storedPermits + (nowMicros - this.nextFreeTicketMicros) / this.stableIntervalMicros)
this.nextFreeTicketMicros = nowMicros
debug('resync : ' + this.nextFreeTicketMicros + ':' + nowMicros + ':' + this.storedPermits)}}}/** constant speed token bucket */
class SmoothBursty extends SmoothRateLimiter {
constructor(stopwatch, maxBurstSeconds) {
super(stopwatch)
this.maxBurstSeconds = maxBurstSeconds
}
doSetRate(permitsPerSecond, stableIntervalMicros) {
let oldMaxPermits = this.maxPermits
this.maxPermits = this.maxBurstSeconds * permitsPerSecond
this.storedPermits = (oldMaxPermits === 0.0)?0.0 : this.storedPermits * this.maxPermits / oldMaxPermits
}
}
Copy the code
Acquire (int) core method
Call acquire(int) to obtain a specified number of tokens,
-
Call the reserve(int) method, which ultimately calls reserveestavailable (int, long). To update the next redeeming token point in time and the number of tokens currently stored, and return the redeeming token point in time, according to the time point to calculate the waiting time
-
The wait time returned in blocking wait (1)
-
Return time to wait (seconds)
Permitting token, block until permitting token, return waiting time */
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
// Apply for a quota in advance, but this quota will not be available until the time is up. Whether you need to wait depends on the time value returned
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
returnreserveAndGetWaitLength(permits, stopwatch.readMicros()); }}/** Return the waiting time */
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
/** Update the next redeeming token point in time and the number of stored tokens based on the number of tokens to be obtained this time, return the redeeming token point in time */
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // Update current data
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // The number of tokens that can be consumed this time
double freshPermits = requiredPermits - storedPermitsToSpend; // The number of tokens to be added
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // The time required to wait
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // Update the point at which the next token is taken
this.storedPermits -= storedPermitsToSpend; // Update the number of tokens currently stored
return returnValue;
}
Copy the code
The acquire(int) method blocks until the token is acquired without obtaining it.
tryAcquire(int,long,TimeUnit)
The tryAcquire(int, Long,TimeUnit) method attempts to acquire the token within the specified timeout period and returns whether the token was obtained successfully if it was obtained or if the timeout period expires
-
NextFreeTicketMicros <= nowMicros + timeoutMicros is true, that is, the current time + timeoutMicros is after the token-taking time, when (pre-consumption feature) Otherwise, it cannot be obtained.
-
If not available, return false immediately.
-
Permits nowMicros (permits, nowMicros) if available, call reserveAndGetWaitLength(Permits, nowMicros) to update the next retrieving token time and the number of tokens currently stored, return wait time (logic same as before), and block wait time, return true.
- The node implementation
class GuavaRateLimiter {
/** * Attempts to obtain a token permitting within the specified timeout time. If the token is obtained or the timeout time expires, whether the token is successfully obtained * is returned@param {int} permits
* @param{int} timeout // MILLISECONDS */
tryAcquire(permits, timeout) {
let timeoutMicros = timeout * 1000
let microsToWait = 0
let nowMicros = this.rateLimiter.stopwatch.readMicros()
if (!this.canAcquire(nowMicros, timeoutMicros)) { // Determines whether the specified number of tokens can be obtained within the timeout period
return false
} else {
microsToWait = this.reserveAndGetWaitLength(permits, nowMicros)
}
this.rateLimiter.stopwatch.sleepMicrosUninterruptibly(microsToWait)
return true
}
/** * If the available time is less than the current time nowMicros + timeout timeoutMicros, you can get (consumable features!) *@param {*} nowMicros
* @param {*} timeoutMicros
* @returns* /
canAcquire(nowMicros, timeoutMicros) {
return this.rateLimiter.queryEarliestAvailable(nowMicros) <= nowMicros + timeoutMicros
}
/** * Get the time when tokens can be issued *@param {*} permits
* @param {*} nowMicros
* @returns* /
reserveAndGetWaitLength(permits, nowMicros) {
let momentAvailable = this.rateLimiter.reserveEarliestAvailable(permits, nowMicros)
return max(momentAvailable - nowMicros, 0) // Ensure that the value is not negative
}
}
queryEarliestAvailable(nowMicros) {
return this.nextFreeTicketMicros
}
reserveEarliestAvailable(requiredPermits, nowMicros) {
this.resync(nowMicros)
let returnValue = this.nextFreeTicketMicros
let storedPermitsToSpend = min(requiredPermits, this.storedPermits)
let freshPermits = requiredPermits - storedPermitsToSpend
let waitMicros = freshPermits * this.stableIntervalMicros
this.nextFreeTicketMicros += waitMicros
this.storedPermits -= storedPermitsToSpend
return returnValue
}
Copy the code
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
// Convert timeout to microseconds
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
// Determine whether the token can be obtained within the timeoutMicros time range
if(! canAcquire(nowMicros, timeoutMicros)) {return false;
} else {
// Get the token and return the number of milliseconds to waitmicrosToWait = reserveAndGetWaitLength(permits, nowMicros); }}// Wait for microsToWait time
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
Copy the code
This is the code that tries to get the token, where the key function is reserveAndGetWaitLength
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
/ * nextFreeTicketMicros said the next token could be given point in time, the value returned, a layer of the function will be called a stopwatch. SleepMicrosUninterruptibly (microsToWait); That is, block until the allocated time point */
long returnValue = nextFreeTicketMicros;
// Calculate how many tokens need to be used
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// Calculate how many tokens are still owed if you need to obtain more than the temporary token
double freshPermits = requiredPermits - storedPermitsToSpend;
// Calculate how long it will take to accumulate the tokens owed
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// Best of all, credit!! Instead of sleeping (waitMicros) as we would like, we move the point in time at which the next assignable token will be assigned backwards
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
Copy the code
That’s the basic process for a SmoothBursty implementation. Two points to note:
-
RateLimiter supports a degree of burst request – pre-consumption – by limiting the wait time for subsequent requests.
-
The implementation of the RateLimiter token bucket is not to start a thread and keep adding tokens to the bucket. Instead, it is implemented in a lazy manner (see resync) by calculating how many tokens can be generated in that time period before each token is obtained, adding the generated tokens to the token bucket and updating the data. It’s much more efficient than a thread that keeps putting tokens into buckets. (Imagine creating a RateLimiter for each user and a thread to control token storage if you wanted to restrict access to an interface for each user. It would be a scary thing if there were tens of millions of users online.)
conclusion
This paper introduces three basic algorithms of flow limiting, among which token bucket algorithm and leak-bucket algorithm are mainly used to limit the speed of request processing, which can be classified as speed limiting, while counter algorithm is used to limit the number of request processing in a time window, which can be classified as limit (no limit on speed). Guava’s RateLimiter is an implementation of the token bucket algorithm, but RateLimiter is only suitable for stand-alone applications, not for distributed environments. Although there are some open source projects available for stream limiting management in distributed environments, such as Alibaba’s Sentinel, the introduction of Sentinel may be too heavy for small projects, but the need for stream limiting exists in small projects as well.
Reference: cidoliu. Making. IO / 2021/02/24 /…