In distributed system, when dealing with high concurrent access, cache, traffic limiting and degradation are common methods to protect the normal operation of the system. When a sudden surge in requests occurs, if access is not restricted, the entire system may crash and the service may become unavailable.

There are also business scenarios, such as SMS captcha, or other third-party API calls, that need to provide the necessary access restriction support. There are also requests that consume too much resources, such as data exports, that require limited access frequency.

Common traffic limiting algorithms are

  • Token bucket algorithm,

  • Bucket algorithm

  • Counter algorithm.

This article mainly introduces the basic principles of the three algorithms and the implementation of the token bucket algorithm in Google Guava package RateLimiter. The next article introduces a recently written distributed traffic limiting implementation and counter traffic limiting implementation based on RateLimiter.

Token bucket algorithm

The principle of token bucket algorithm is to put a token into the bucket at a constant speed, each request processing needs to obtain a token from the bucket first, when there is no token in the bucket, the request will not be processed, either queuing, demoting processing, or outright denial of service. When the bucket is full of tokens, newly added tokens are discarded or rejected.

The processing diagram of token bucket algorithm is as follows (picture from network)

The token bucket algorithm is mainly able to control the average processing rate of requests. It allows pre-consumption, that is, tokens can be consumed in advance to deal with unexpected requests, but later requests need to pay for pre-consumption (wait longer) to meet the average processing rate of requests is certain.

Bucket algorithm

The principle of the leaky bucket algorithm is that the water (request) enters the leaky bucket first, and the leaky bucket is discharged at a certain speed (processing request). When the inflow speed of water is higher than the outflow speed, the water gradually accumulates in the bucket until the bucket is full, and the water will overflow (request is rejected).

The processing diagram of leaky bucket algorithm is as follows (picture from network)

Leaky bucket algorithm mainly controls the processing rate of requests and smoothen the burst traffic on the network. Requests can enter the leaky bucket at any speed, but the request is processed at a constant speed.

Counter algorithm

The counter algorithm is the simplest one in the flow limiting algorithm, which limits how many requests can be processed at most in a time window. For example, a maximum of 10 requests can be processed per minute. The maximum number of requests can be processed in a 60s time window starting from the time of the first request. The next time window starts at the time when the first request comes in after the previous time window. Common functions such as obtaining SMS verification codes only once per minute can be implemented by counter algorithms.

Guava RateLimiter parsing

Guava is an open source toolkit from Google. RateLimiter is a stream limiting utility class that implements the token bucket algorithm. RateLimiter can be used by adding the Guava dependency to pom.xml


<dependency>

    <groupId>com.google.guava</groupId>

    <artifactId>guava</artifactId>

    <version>29.0 the jre</version>

</dependency>

Copy the code

The following test code example uses RateLimiter,

public static void main(String[] args) { RateLimiter rateLimiter = RateLimiter.create(1); // create a token bucket that generates one token per second i<=5; i++) { double waitTime = rateLimiter.acquire(i); System.out.println("acquire:" + I + "waitTime:" + waitTime); }}Copy the code

After running, the output is as follows,

Acquire :1 waitTime:0.0 acquire:2 waitTime:0.997729 acquire:3 waitTime:1.998076 acquire:4 waitTime:3.000303 acquire:5 WaitTime: 4.000223Copy the code

Get a token for the first time, waiting for the 0 s immediate access to (there is no need to wait for because the token bucket of consumption characteristics), the second to get two tokens, waiting time for 1 s, the 1 s is because the front for a token advance consumption without waiting to the waiting time, this time for two and consumption, So the next fetch (at three) will have to wait for the 2s required for this pre-consumption, and so on. It can be seen that the time that pre-consumption does not need to wait is paid by the next time, so as to ensure a certain average processing speed (for example, 1s).

RateLimiter has two implementations:

  1. SmoothBursty: Token generation rate is constant. A SmoothBursty instance is created using RateLimiter. Create (Double permitsPerSecond).

  2. SmoothWarmingUp: Token generation speeds up until a stable value is reached. WarmingUp, as the name suggests, has a warm-up process. Create (Double permitsPerSecond, Long warmupPeriod, TimeUnit Unit) is a SmoothWarmingUp instance, A warmupPeriod is the time during which you warmup to a steady speed.

The class structure is as follows

Key attributes and method resolution (SmoothBursty as an example)

Source code analysis:

Method of use


RateLimiter rateLimiter = RateLimiter.create(QPS);


if(rateLimiter.tryAcquire()){

System.out.println("Handle request");

}else{

System.out.println("Reject request");

}

Copy the code

Core fields:

  • PermitsPerSecond: the number of tokens generated per second;

  • MaxBurstSeconds: /** The maximum number of seconds to store tokens in the bucket */

  • MaxPermits: The maximum number of tokens equals to permitsPerSecond*maxBurstSeconds

  • StableIntervalMicros: The number of microseconds required to generate a token

  • StoredPermits: number of tokens currently stored;

  • **nextFreeTicketMicros: core parameter indicating the next point in time when tokens can be assigned; ** can be a point in time in the past or in the future

Core idea – pre-distribution

  1. Sleep to nextFreeTicketMicros

  2. TryAcquire can be assigned a token at this time regardless of current storedPermits

  3. Delay nextFreeTicketMicros to nextFreeTicketMicros+(permit-storedpermits)*stableIntervalMicros; In this way, even if the current storedperps are not allocated enough, the effect of flow limiting can be achieved by pushing back the time point of next allocation and no token will be allocated before this time point.

This is an idea, mostly to be understood, but some boundary or WarmUp implementation details are omitted.

To be clear, it’s not our traditional way of thinking, you accumulate N tokens and then you distribute them. Instead, it allocates first and then moves the next allocation later.

For example:

Let’s say I can allocate 10 tokens per second, and the current time is 2s, so I’ve stored 20 tokens, and now I want to allocate 30 tokens, one way is to wait 1 second, save 30 tokens at 3 seconds, and then allocate back to allocate tokens; Guava, on the other hand, simply goes back and sets the next token allocation point at 3 seconds.

Both cases fulfill the QPS requirement of 30 tokens allocated in 3 seconds

Core method

create(double permitsPerSecond)

The create(double permitsPerSecond) method creates a SmoothBursty instance with maxBurstSeconds set to 1s by default. SleepingStopwatch is an implementation of the clock class in Guava.

It is initialized by calling SmoothBursty. DoSetRate (double, long)


static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
		RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
		rateLimiter.setRate(permitsPerSecond);
		return rateLimiter;
}

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
	super(stopwatch);
	this.maxBurstSeconds = maxBurstSeconds;
}

Copy the code
  • Node does not need to implement the corresponding create instance, just a new class.

class GuavaRateLimiter {

    constructor(permitsPerSecond) {

        this.permitsPerSecond = permitsPerSecond // The number of tokens generated per second

        this.maxBurstSeconds = 1 // The number of tokens that can be held in a bucket for up to 1 second

        this.rateLimiter = new SmoothBursty(new SleepingStopwatch(), this.maxBurstSeconds)


        this.rateLimiter.setRate(permitsPerSecond)

    }


    setRate(permitsPerSecond) {

        this.rateLimiter.setRate(permitsPerSecond)

        Logger.info('update rate : ' + permitsPerSecond)

    }

}

Copy the code

doSetRate(double, long)

In doSetRate method:

  1. Resync (nowMicros) adjusts storedPermits and nextFreeTicketMicros — if the current time is later than nextFreeTicketMicros, the number of tokens generated during this time is counted. Accumulate to storedPermits, and update the next available token time nextFreeTicketMicros to the current time.

  2. Calculate the value of stableIntervalMicros, 1/permitsPerSecond.

  3. Call the doSetRate(double, double) method to calculate maxBurstSeconds*permitsPerSecond, and adjust storedPermits based on the old maxPermits value.


@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
		resync(nowMicros);
		double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
		this.stableIntervalMicros = stableIntervalMicros;
		doSetRate(permitsPerSecond, stableIntervalMicros);
}

/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
		// if nextFreeTicket is in the past, resync to now
		if (nowMicros > nextFreeTicketMicros) {
		doublenewPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; }}@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
		double oldMaxPermits = this.maxPermits;
		maxPermits = maxBurstSeconds * permitsPerSecond;
		if (oldMaxPermits == Double.POSITIVE_INFINITY) {
				// What scenario is this big? Common applications are not available
				storedPermits = maxPermits;
		} else {
				storedPermits =
						(oldMaxPermits == 0.0)?0.0 // initial state: storedPermits * maxPermits / oldMaxPermits; }}Copy the code

Implement the following in Node:


/** * constant rate token limiter */

class SmoothRateLimiter {

    constructor(stopwatch) {

        this.stopwatch = stopwatch

        this.storedPermits = 0 // double

        this.maxPermits = 0 // double

        this.stableIntervalMicros = 0 // double

        this.nextFreeTicketMicros = 0 // long

    }


    setRate(permitsPerSecond) {

        let nowMicros = this.stopwatch.readMicros()

        this.resync(nowMicros)

        this.stableIntervalMicros = 1000000 / permitsPerSecond // double

        this.doSetRate(permitsPerSecond, this.stableIntervalMicros)

    }


    queryEarliestAvailable(nowMicros) {

        debug(this.nextFreeTicketMicros + ':' + nowMicros)

        return this.nextFreeTicketMicros

    }


    reserveEarliestAvailable(requiredPermits, nowMicros) {

        this.resync(nowMicros)

        let returnValue = this.nextFreeTicketMicros

        let storedPermitsToSpend = min(requiredPermits, this.storedPermits)

        let freshPermits = requiredPermits - storedPermitsToSpend

        debug('freshPermits : ' + freshPermits)

        let waitMicros = freshPermits * this.stableIntervalMicros

        debug('waitMicros : ' + waitMicros)

        debug('nextFreeTicketMicros : ' + this.nextFreeTicketMicros)

        this.nextFreeTicketMicros += waitMicros

        this.storedPermits -= storedPermitsToSpend

        debug('storedPermits : ' + this.storedPermits)

        debug('maxPermits : ' + this.maxPermits)

        return returnValue

    }


    /** * delay calculation * @param {*} nowMicros */

    resync(nowMicros) {

        if (nowMicros > this.nextFreeTicketMicros) {

            this.storedPermits = min(this.maxPermits, this.storedPermits + (nowMicros - this.nextFreeTicketMicros) / this.stableIntervalMicros)

            this.nextFreeTicketMicros = nowMicros

            debug('resync : ' + this.nextFreeTicketMicros + ':' + nowMicros + ':' + this.storedPermits)}}}/** constant speed token bucket */

class SmoothBursty extends SmoothRateLimiter {

    constructor(stopwatch, maxBurstSeconds) {

        super(stopwatch)

        this.maxBurstSeconds = maxBurstSeconds

    }


    doSetRate(permitsPerSecond, stableIntervalMicros) {

        let oldMaxPermits = this.maxPermits

        this.maxPermits = this.maxBurstSeconds * permitsPerSecond

        this.storedPermits = (oldMaxPermits === 0.0)?0.0 : this.storedPermits * this.maxPermits / oldMaxPermits

    }

}

Copy the code

Acquire (int) core method

Call acquire(int) to obtain a specified number of tokens,

  1. Call the reserve(int) method, which ultimately calls reserveestavailable (int, long). To update the next redeeming token point in time and the number of tokens currently stored, and return the redeeming token point in time, according to the time point to calculate the waiting time

  2. The wait time returned in blocking wait (1)

  3. Return time to wait (seconds)


Permitting token, block until permitting token, return waiting time */
@CanIgnoreReturnValue
public double acquire(int permits) {
		long microsToWait = reserve(permits);
		stopwatch.sleepMicrosUninterruptibly(microsToWait);
		return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
// Apply for a quota in advance, but this quota will not be available until the time is up. Whether you need to wait depends on the time value returned
final long reserve(int permits) {
		checkPermits(permits);
		synchronized (mutex()) {
				returnreserveAndGetWaitLength(permits, stopwatch.readMicros()); }}/** Return the waiting time */
final long reserveAndGetWaitLength(int permits, long nowMicros) {
		long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
		return max(momentAvailable - nowMicros, 0);
}

/** Update the next redeeming token point in time and the number of stored tokens based on the number of tokens to be obtained this time, return the redeeming token point in time */
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
		resync(nowMicros); // Update current data
		long returnValue = nextFreeTicketMicros;
		double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // The number of tokens that can be consumed this time
		double freshPermits = requiredPermits - storedPermitsToSpend; // The number of tokens to be added
		long waitMicros =
				storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
						+ (long) (freshPermits * stableIntervalMicros); // The time required to wait

		this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // Update the point at which the next token is taken
		this.storedPermits -= storedPermitsToSpend; // Update the number of tokens currently stored
		return returnValue;
}



Copy the code

The acquire(int) method blocks until the token is acquired without obtaining it.

tryAcquire(int,long,TimeUnit)

The tryAcquire(int, Long,TimeUnit) method attempts to acquire the token within the specified timeout period and returns whether the token was obtained successfully if it was obtained or if the timeout period expires

  1. NextFreeTicketMicros <= nowMicros + timeoutMicros is true, that is, the current time + timeoutMicros is after the token-taking time, when (pre-consumption feature) Otherwise, it cannot be obtained.

  2. If not available, return false immediately.

  3. Permits nowMicros (permits, nowMicros) if available, call reserveAndGetWaitLength(Permits, nowMicros) to update the next retrieving token time and the number of tokens currently stored, return wait time (logic same as before), and block wait time, return true.

  • The node implementation

class GuavaRateLimiter {

    /** * Attempts to obtain a token permitting within the specified timeout time. If the token is obtained or the timeout time expires, whether the token is successfully obtained * is returned@param {int} permits

     * @param{int} timeout // MILLISECONDS */

    tryAcquire(permits, timeout) {

        let timeoutMicros = timeout * 1000

        let microsToWait = 0

        let nowMicros = this.rateLimiter.stopwatch.readMicros()

        if (!this.canAcquire(nowMicros, timeoutMicros)) {  // Determines whether the specified number of tokens can be obtained within the timeout period

            return false

        } else {

            microsToWait = this.reserveAndGetWaitLength(permits, nowMicros)

        }

        this.rateLimiter.stopwatch.sleepMicrosUninterruptibly(microsToWait)

        return true

    }


    /** * If the available time is less than the current time nowMicros + timeout timeoutMicros, you can get (consumable features!) *@param {*} nowMicros 

     * @param {*} timeoutMicros 

     * @returns* /

    canAcquire(nowMicros, timeoutMicros) {

        return this.rateLimiter.queryEarliestAvailable(nowMicros) <= nowMicros + timeoutMicros

    }


    /** * Get the time when tokens can be issued *@param {*} permits 

     * @param {*} nowMicros 

     * @returns* /

    reserveAndGetWaitLength(permits, nowMicros) {

        let momentAvailable = this.rateLimiter.reserveEarliestAvailable(permits, nowMicros)

        return max(momentAvailable - nowMicros, 0) // Ensure that the value is not negative

    }

}


queryEarliestAvailable(nowMicros) {

   return this.nextFreeTicketMicros

}


reserveEarliestAvailable(requiredPermits, nowMicros) {

    this.resync(nowMicros)

        let returnValue = this.nextFreeTicketMicros

        let storedPermitsToSpend = min(requiredPermits, this.storedPermits)

        let freshPermits = requiredPermits - storedPermitsToSpend

     

        let waitMicros = freshPermits * this.stableIntervalMicros

        

        this.nextFreeTicketMicros += waitMicros

        this.storedPermits -= storedPermitsToSpend

       

        return returnValue

    }

Copy the code

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {

    // Convert timeout to microseconds

    long timeoutMicros = max(unit.toMicros(timeout), 0);

    checkPermits(permits);

    long microsToWait;

    synchronized (mutex()) {

      long nowMicros = stopwatch.readMicros();

      // Determine whether the token can be obtained within the timeoutMicros time range

      if(! canAcquire(nowMicros, timeoutMicros)) {return false;

      } else {

        // Get the token and return the number of milliseconds to waitmicrosToWait = reserveAndGetWaitLength(permits, nowMicros); }}// Wait for microsToWait time

    stopwatch.sleepMicrosUninterruptibly(microsToWait);

    return true;

  }

Copy the code

This is the code that tries to get the token, where the key function is reserveAndGetWaitLength


final long reserveAndGetWaitLength(int permits, long nowMicros) {

    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

    return max(momentAvailable - nowMicros, 0);

  }

@Override

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {

    resync(nowMicros);

    / * nextFreeTicketMicros said the next token could be given point in time, the value returned, a layer of the function will be called a stopwatch. SleepMicrosUninterruptibly (microsToWait); That is, block until the allocated time point */

    long returnValue = nextFreeTicketMicros;

    // Calculate how many tokens need to be used

    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);

    // Calculate how many tokens are still owed if you need to obtain more than the temporary token

    double freshPermits = requiredPermits - storedPermitsToSpend;

    // Calculate how long it will take to accumulate the tokens owed

    long waitMicros =

        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)

            + (long) (freshPermits * stableIntervalMicros);

    // Best of all, credit!! Instead of sleeping (waitMicros) as we would like, we move the point in time at which the next assignable token will be assigned backwards

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);

    this.storedPermits -= storedPermitsToSpend;

    return returnValue;

  }

Copy the code

That’s the basic process for a SmoothBursty implementation. Two points to note:

  1. RateLimiter supports a degree of burst request – pre-consumption – by limiting the wait time for subsequent requests.

  2. The implementation of the RateLimiter token bucket is not to start a thread and keep adding tokens to the bucket. Instead, it is implemented in a lazy manner (see resync) by calculating how many tokens can be generated in that time period before each token is obtained, adding the generated tokens to the token bucket and updating the data. It’s much more efficient than a thread that keeps putting tokens into buckets. (Imagine creating a RateLimiter for each user and a thread to control token storage if you wanted to restrict access to an interface for each user. It would be a scary thing if there were tens of millions of users online.)

conclusion

This paper introduces three basic algorithms of flow limiting, among which token bucket algorithm and leak-bucket algorithm are mainly used to limit the speed of request processing, which can be classified as speed limiting, while counter algorithm is used to limit the number of request processing in a time window, which can be classified as limit (no limit on speed). Guava’s RateLimiter is an implementation of the token bucket algorithm, but RateLimiter is only suitable for stand-alone applications, not for distributed environments. Although there are some open source projects available for stream limiting management in distributed environments, such as Alibaba’s Sentinel, the introduction of Sentinel may be too heavy for small projects, but the need for stream limiting exists in small projects as well.

Reference: cidoliu. Making. IO / 2021/02/24 /…