Analysis of current limiting implementation in Guava package

RateLimiter

Common traffic limiting algorithms have been introduced in previous articles, but Google uses the traffic limiting tool in the Guava package for service traffic limiting in the Java space.

Review the use case

Google’s open source toolkit Guava provides the RateLimiter tool class, which is very convenient to use and implements traffic limiting based on the token bucket algorithm.

@Test
public void  testSample(a) {
    RateLimiter rateLimiter = RateLimiter.create(500)}Copy the code

For the above example, create a RateLimiter that puts 500 tokens per second (1 token in 0.002 seconds), with the following output:

As can be seen from the output results, RateLimiter has the ability of pre-consumption:

  • Request 1 preconsumes 1 token directly without any wait
  • On request 2, we waited 2 seconds because we pre-consumed 1 token, and then we pre-consumed 6 tokens
  • In the same way, we waited 12 seconds because we pre-consumed 6 tokens

It’s a linear mechanism.

  • RateLimiter supports a degree of burst request (pre-consumption) by limiting the wait time for subsequent requests.

  • However, such burst request processing capability is not required in some cases. For example, an IM vendor provides an IM vendor push interface, but the push interface has a strict frequency limit (600 times /30 seconds). Therefore, the IM vendor push interface cannot be pre-consumed when invoked.

  • The RateLimiter class is the core class of flow limiting, which is an abstract class of public. RateLimiter has an implementation class SmoothRateLimiter. SmoothBursty and SmoothWarmingUp are two more concrete implementations of SmoothRateLimiter depending on the token consumption strategy.

  • In practice, the RateLimiter class is usually used directly. Other classes are transparent to the user. The RateLimiter class is designed using a little trick similar to the BUILDER pattern, with some adjustments.

  • As you can see from the RateLimiter class diagram, the RateLimiter class not only takes on the responsibility of creating the concrete implementation class, but also determines the methods available from the created actual class. The standard creator pattern UML diagram is shown below (quoted from Baidu Baike)

Stream limiting utility class in Guava package

Introduction to Guava core stream limiting class

  • The RateLimiter class is the core class of flow limiting. It is an abstract class of public. RateLimiter has an implementation class SmoothRateLimiter. SmoothBursty and SmoothWarmingUp are two more concrete implementations of SmoothRateLimiter depending on the token consumption strategy.

Guava has two current-limiting modes

  • One is stable mode (SmoothBursty: token generation speed is constant)
  • One is gradual (SmoothWarmingUp: the token generation rate increases slowly until it reaches a steady value)

The two modes have similar implementation ideas. The main difference lies in the calculation of waiting time.

Guava RateLimiter core class implementation

  • In practice, the RateLimiter class is usually used directly; other classes are transparent to the user. The RateLimiter class is designed using a little trick similar to the BUILDER pattern, with some tweaks.
  • As you can see from the RateLimiter class diagram, the RateLimiter class not only takes on the responsibility of creating the concrete implementation class, but also determines the methods available from the created actual class.

The RateLimiter class takes on the responsibilities of both Builder and Product.

SmoothBursty
  • The Guava package RateLimiter class first uses the create function to create a stream limiter, specifying that 2 tokens are generated per second, and acquire or fetch tokens when the service needs to be called.
Create function analysis
  • The create function has two overloads, and different subclasses of the RateLimiter concrete implementation may be created depending on the overloads.

  • Currently, there are two subclasses of SmoothBursty and SmoothWarmingUp that can be returned.

  • When the CREATE interface is called, the SmoothBursty class is actually instantiated

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}
Copy the code

Before I explain the SmoothBursty principle, I will explain the meanings of several attributes of SmoothBursty

/** * The currently stored permit. */
double storedPermits;
/** * The maximum number of permits of stored permits. */
double maxPermits;
/** * The interval between two unit requests, at our stable rate. E.g., A stable rate of 5 permitting * per second has a stable interval of 200ms. * Token addition interval */
double stableIntervalMicros;
/** * The time when the next request (no matter its size) will be granted. After granting a request, * This is pushed further in the future. Large requests push this further than small requests Since RateLimiter allows pre-consumption, after the last request for a pre-consumption token * the next request will wait for the corresponding time until the nextFreeTicketMicros moment to obtain the token */
private long nextFreeTicketMicros = 0L;
// could be either in the past or future
Copy the code

TryAcquire function implementation mechanism
  • It is very easy to understand the interface exposed by RateLimiter
@CanIgnoreReturnValue
public double acquire(a) {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    returnreserveAndGetWaitLength(permits, stopwatch.readMicros()); }}Copy the code
  • Acquire function is mainly used to obtain permits token and calculate the waiting time, then suspend the wait and return the value
public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire(a) {
  return tryAcquire(1.0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if(! canAcquire(nowMicros, timeoutMicros)) {return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}
Copy the code
  • Acquire function is mainly used to obtain permits token and calculate the waiting time, then suspend the wait and return the value
  • The tryAcquire function can attempt to acquire the token during timeout, suspend it if it can, wait for the appropriate time and return true, otherwise return false immediately
  • CanAcquire is used to determine whether a token can be obtained during timeout
Resync function

This function is called before each token acquisition. If the current time is later than nextFreeTicketMicros, it calculates how many tokens can be generated during this period, adds the generated tokens to the token bucket and updates the data. In this way, you only need to evaluate once when you get the token.

/**
 * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
 */
void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      doublenewPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; }}Copy the code
Acquire function analysis

Acquire functions also have two overloaded classes, but the analysis process only needs the overloading of the relation function with integer arguments, and the no-argument function is just a shorthand for Acquire (1).

Three things are mainly completed in acquire(int permits) function:

  • Number of pre-allocated authorizations, which returns the time to wait, possibly 0;
  • Sleep according to the waiting time;
  • Returns the time, in seconds, to obtain authorization.

In doing so, the RateLimiter class defines the skeleton of the process for obtaining authorization and implements some generic methods that call abstract methods for implementation, which can be overridden by specific subclasses depending on the algorithm’s requirements.

The call process is shown as follows:

The “reservewritten available” method in the orange block is implemented by subclasses. The following sections analyze how the method is implemented by subclasses of the RateLimiter class.

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; NextFreeTicketMicros is the last calculated nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // Number of tokens that can be consumed
  double freshPermits = requiredPermits - storedPermitsToSpend; // The number of tokens still needed
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // Calculate the waiting time according to freshPermits

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); NextFreeTicketMicros is not returned for this calculation
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}
Copy the code
  • This function is used to get requiredPermits tokens and return the required wait time

  • Where, storedPermitsToSpend is the number of tokens that can be consumed in the bucket, freshPermits is the number of tokens that need to be added. The waiting time is calculated according to the value, and appended and updated to nextFreeTicketMicros

  • Note that this function returns the pre-update nextFreeTicketMicros (calculated from the last request), not the current update nextFreeTicketMicros. Generally speaking, this request needs to pay for the pre-consumption behavior of the last request. This is how RateLimiter can pre-consume (handle emergencies). If you need to disable pre-consumption, change this to return the updated nextFreeTicketMicros value.

Constructor for SmoothBursty
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; MaxBurstSeconds Specifies the token to be generated in seconds
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // Computes the maximum number of stored tokens
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)?0.0 // initial state: storedPermits * maxPermits / oldMaxPermits; }}Copy the code
  • The maximum number of tokens that can be stored in a bucket is calculated by maxBurstSeconds, which means the maximum number of tokens that can be generated by maxBurstSeconds.
  • The function of this parameter is to control the traffic more flexibly. For example, the limit is 300 times /20 seconds for some interfaces and 50 times /45 seconds for some interfaces.
Abstract function analysis

Two abstract functions coolDownIntervalMicros and storedPermitsToWaitTime appeared in the above code analysis. Now we analyze these two abstract functions.

CoolDownIntervalMicros function

** mainly refers to the time consumed to generate a token. This function is mainly used to calculate the number of tokens that can be generated at the current time. SmoothBursty SmoothWarmingUp According to the UML diagram above, smooththratelimiter has two subclasses. 支那

SmoothBursty implements the coolDownIntervalMicros function as follows:

@Override
double coolDownIntervalMicros(a) {
  return stableIntervalMicros;
}
Copy the code

As you can see, the implementation is quite simple and simply returns the stableIntervalMicros property, which is the time interval required to generate the two tokens.

SmoothWarmingUp class for coolDownIntervalMicros function implementation as follows:

@Override
double coolDownIntervalMicros(a) {
  return warmupPeriodMicros / maxPermits;
}
Copy the code
  • MaxPermits property has appeared above, indicating the maximum capacity of the current token bucket.
  • WarmupPeriodMicros attribute is unique to the SmoothWarmingUp class, indicating the time elapsed between 0 and Maxpermitting in the token bucket. WarmupPeriodMicros/maxPermits Indicates the interval between token generation time before the number of tokens reaches maxPermits.
StoredPermitsToWaitTime function

Mainly represents the time it takes to consume tokens stored in the token bucket.

SmoothBursty implements the storedPermitsToWaitTime function as follows:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  return 0L;
}
Copy the code

Return 0 to indicate that no time is required to consume the token.

SmoothBursty implements the storedPermitsToWaitTime function as follows:

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // measuring the integral on the right part of the function (the climbing line)
  if (availablePermitsAboveThreshold > 0.0) {
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // TODO(cpovirk): Figure out a good name for this variable.
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // measuring the integral on the left part of the function (the horizontal line)
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}
Copy the code
  • The implementation is complex and the core idea is that the calculation of consumption of the current storage token needs to be treated differently according to the preheating Settings. ThresholdPermits involves a new variable, which is the token threshold value. When the number of tokens currently stored is greater than this value, tokens in the storedpermit-thresholdpermits range need to be warmed up (i.e., the interval of consuming each token gradually decreases). However, the tokens consuming 0~thresholdPermits were stored, and the consumption time of each token was a fixed value, i.e., stableIntervalMicros.

  • ThresholdPermits value needs to take into account the preheating time and token generation speed, namely, thresholdPermits = 0.5 * warmupPeriodMicros/stableIntervalMicros. . It can be seen that the threshold value is half of the number of tokens that can be generated in the preheating time, and the calculation time of tokens above the threshold value can be converted into the calculation of the trapezoidal area of the preheating graph (actually integral) according to the notes, which is not expanded in detail here.

  • Using this design can ensure that when the last request interval is long, there are more tokens stored in the token bucket. When these tokens are consumed, the initial token consumption time is long, and the subsequent time is gradually shortened until the state of stableIntervalMicros is reached, resulting in a warm-up effect.

Implementation summary

  • According to the token bucket algorithm, the token in the bucket is continuously generated and stored. When there is a request, the token must be obtained from the bucket before it can be executed. Who will continuously generate and store the token?

    • One solution is to start a scheduled task that continuously generates tokens. For example, an interface needs to set the access frequency limit for each user. Assuming that there are 6W users in the system, at most 6W scheduled tasks need to be enabled to maintain the number of tokens in each bucket, which costs a lot.

    • In the process of realization of current limiter, token bucket with preheater is added based on the idea of token bucket. Threads that are restricted use their own SleepingStopwatch utility class, which ultimately uses Thread.sleep(MS, NS). Method, the lock held by a thread using sleep will not be released, which is important in multithreaded programming.

    • Finally, the flow limiter trigger algorithm adopts the way of predetermined token, that is, the number of tokens required by the current request will not affect the waiting time of the current request, but will affect the waiting time of the next request.