Recently, I studied the source code in Sentinel, and then I shared my experience with you
Sentinel version
1.8.0 comes with
Fixed window algorithm
Let’s first introduce the simplest traffic limiting algorithm
Each window has a counter (counter) to count traffic, if counter + the number of requests for this application > the preset QPS, the request will be rejected.
Fixing Windows is simple, but there are big problems
Suppose we stipulate that QPS cannot exceed 100. As shown in the figure above, 60 requests each come in at points R1 and R2, so QPS is already greater than 100. At this point, traffic limiting should be triggered, but the fixed window algorithm stupidly focuses on its own window traffic and does not sense that the QPS has exceeded
Sliding window algorithm
In this algorithm, the unit time is divided into multiple Windows, and the sum of the flow of the current window + the past several Windows is calculated each time QPS is calculated, so as to avoid the problem of fixed Windows (the specific number of Windows to be used depends on the window size and unit time size). For example, in the figure above, the size of each window is 500 ms, and the current limiting takes 1 s as the unit of time. Use current + Last each time.)
The algorithm realizes detailed thinking
Now that you understand the idea of the algorithm, the next thing to think about is how do you implement this algorithm
-
First we need to have a timeline as shown in the figure above to record the time window, which can be implemented using arrays.
-
Now that we have the timeline, let’s think about the time window.
Each time window must have a thread-safe counter and the corresponding time of the current window
/ / the timeline
List<Window> timeline = new ArrayList<>();
// Size of each window
int windowTime;
// Time window
class Window {
Timestamp startTime;
AtomicInteger counter;
}
Copy the code
But if you think about it, there are still some problems
-
Since time is always increasing, what about our array? With the infinite increment of time?
-
Old time Windows (such as a few seconds ago) will no longer be used in future computations. How do I clean up these useless Windows?
How is the sliding window algorithm implemented in Sentinel
So with that in mind how does Sentinel work
LeapArray
Sentinel sliding window algorithm in the core class, first to understand its core member variables
public abstract class LeapArray<T> {
// The size of the unit of time to count, such as QPS calculation, is 1000
protected int intervalInMs;
// Sample size
protected int sampleCount;
// Window size This value = intervalInMs/sampleCount
protected int windowLengthInMs;
// Store an array of time Windows
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0."bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0."total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0."time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = newAtomicReferenceArray<>(sampleCount); }}Copy the code
When single-node traffic limiting collects QPS statistics, the default values are sampleCount = 2, intervalInMs = 1000, and windowLengthInMs = 500
LeapArray#calculateTimeIdx
The general idea is the same, again using an array to achieve the timeline, each element represents a window of time
The size of the array in Sentinel is fixed. Use the method LeapArray#calculateTimeIdx to determine the position of the timestamp in the array (find the window position corresponding to the timestamp).
How do you understand this method?
Let’s put the data in and say windowLengthInMs = 500 ms (each time window size is 500 ms)
If timestamp starts at 0, each time window is [0,500] [500,1000] [1000,1500]…
TimeId % array.length() and array length are not considered. Assuming that the current timeMillis = 601, we can actually determine the position of the time window corresponding to the current timestamp in the array by putting the value into timeMillis/windowLengthInMs
Since the length of the array is fixed, the remainder modulo is used to determine the position of the time window in the array
LeapArray#currentWindow
The structure of the Window in Sentinel is similar to what we thought above. Counters use generics to make them more flexible
public class WindowWrap<T> {
/** * Time length of a single window bucket in milliseconds. */
private final long windowLengthInMs;
/** * Start timestamp of the window in milliseconds. */
private long windowStart;
/** * Statistic data. */
private T value;
// omit...
}
Copy the code
Continuing with currentWindow, this method finds or creates the Window corresponding to the timestamp passed in
This method has a lot of comments in the source code, I removed some comments
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield(); }}else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally{ updateLock.unlock(); }}else {
// Contention failed, the thread will yield its time slice to wait for bucket available.Thread.yield(); }}else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return newWindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); }}}Copy the code
The logical analysis of the method is as follows:
The first two things to do
- Count timestamp in array (timestamp)
calculateTimeIdx
- Calculation of the timestamp
windowStart
(window start time), passtimeMillis - timeMillis % windowLengthInMs
This value will be used later
Then enter a while(true) loop to find the corresponding window by WindowWrap
old = array.get(IDX)
-
old == null
Create the window and add it to the array (since there may be multiple threads adding array elements at the same time, it is necessary to ensure thread safety, so the array used here is AtomicReferenceArray). Return to the newly created window
-
windowStart == old.windowStart()
The window already exists, so just return it
-
windowStart > old.windowStart()
Resets the windowStart and counter of the current window. This is also a multithreaded operation, so updatelock. tryLock() is used.
After taking a closer look at the code, I asked a question. I don’t think this place is guaranteed to be locked. Will there be two threads at the same time to determine the need to update, because one thread successfully executed and released the Lock, the second thread also successfully acquired the Lock, will execute multiple resetWindow. I think it is necessary to judge the execution conditions after tryLock again. At present, an Issue has been submitted to Sentinel
-
windowStart < old.windowStart()
You don’t normally go to this logical branch, as the comments to the source code above explain
LeapArray#values
As mentioned above, how many Windows are used to calculate traffic depends on the window size and unit time size
The purpose of this method is to find all the time Windows required for the computation by passing in a timestamp
public List<T> values(long timeMillis) { if (timeMillis < 0) { return new ArrayList<T>(); } int size = array.length(); List<T> result = new ArrayList<T>(size); for (int i = 0; i < size; i++) { WindowWrap<T> windowWrap = array.get(i); if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { continue; } result.add(windowWrap.value()); } return result; } public boolean isWindowDeprecated(long time, WindowWrap<T> WindowWrap) {return time - windowwrap.windowStart () > intervalInMs; }Copy the code
There’s nothing to say about the logic of values, going through the List and adding the window that matches the time to the List, okay
Focus on the isWindowDeprecated method
I’m going to plug in the same number as above. The size of each window is 500 ms, for example, timestamp is 1601, the corresponding windowStart is 1500, at this time (1601-1500 > 1000) = false, that is, this window is valid. Further calculation, The previous window, windowStart, is also valid for 1000. Extrapolating further, or backwards, is an invalid window.
IntervalInMs Limits traffic by the time segment. Traffic limiting can be performed in a period of 1s or 60s.
Sentinel current limiting idea
Once you understand the details of the LeapArray#currentWindow and LeapArray#values methods, you can actually figure out how to implement limiting
First, according to the current timestamp, find the corresponding several Windows, according to the total amount of traffic in all Windows + the number of traffic currently applied to determine whether to pass
- If it fails, an exception is thrown
- If yes, the corresponding window plus the number of traffic that passes this time
Sentinel current limiting implementation
Sentinel is basically the same idea, but the logic is a little more complicated. Here are some codes that you can debug for yourself
Sentinel current limiting check
As explained in the Sentinel documentation, the class responsible for limiting flow is FlowSlot, which uses FlowRuleChecker to check whether the current resource needs limiting
FlowSlot#entry
FlowRuleChecker#checkFlow
The ControlBehavior uses DefaultController#canPass to limit flow according to FlowRule Settings. In the following figure, determine whether to limit traffic by checking whether the current traffic number plus the number of applications exceeds the preset number
Note: Prioritized = false when using SphU. Entry or True when using SphU. EntryWithPriority. Node. tryOccupyNext means how long to wait if you want to occupy a future time window token (waitInMs in figure above). If it is less than the specified timeout, the number of pending requests is recorded, and then sleep(waitInMs) is executed. The PriorityWaitException captured by the outer layer is handled by itself, and the user logic is executed without the user being aware.
Node.pass () is called when grade of Rule is FLOW_GRADE_QPS. The implementation of the call is StatisticNode#passQps, as shown below
rollingCounterInSecond.getWindowIntervalInSec()
QPS is calculated in 1 secondrollingCounterInSecond.pass()
When calculating QPS, return a maximum of two window pass requests (currentWindow + lastWindow)
rollingCounterInSecond#pass
First try whether you need to create the current time window, then find the relevant window, calculate the total traffic.
Sentinel Request Record
Code location StatisticSlot#entry, fireEntry will check against the rules we configured (such as limiting traffic above).
If the check does not throw an exception, the number of threads and requests requested are logged (this is where the flow limiting check depends on data).
Cluster current-limiting
What is the use of cluster traffic limiting
Before cluster limiting, if you want to limit the QPS of the entire service to a certain value. For example, we have 10 instances of a Server, and we want the total QPS to be less than 100. In this case, we set the QPS of each instance to 10.
Ideally, this will keep the QPS at 100. However, if the traffic is not evenly distributed to each Server. This may cause some servers to start limiting traffic before the total volume reaches 100.
In this case, Sentinel’s cluster limited outflow field is required.
Cluster traffic limiting principle
Due to space constraints, we will not discuss how to build cluster limiting here, but how Sentinel can build on this.
The idea is simple: Select a Token Server. After traffic limiting is enabled for the cluster, all clients ask the Token Server for traffic limiting and the Server determines whether the current request is traffic limiting. The specific implementation details are slightly different from single-machine current limiting, but the core algorithm is still using LeapArray
Here are also a few source code location, interested students to read
The Client determines whether to use local traffic limiting or cluster traffic limiting based on the Rule. FlowRuleChecker#canPassCheck
The Server side, DefaultTokenService# requestToken
Concurrency lower limit flow problem
After a complete reading of the single-machine and cluster flow limiting code, I found a problem, and the flow limiting process can be simplified as follows
/ / pseudo code
/ / the biggest QPS
int maxCount;
// The current requested traffic number
int aquireCount;
int passQps = getPassQPS();
if (passQps + aquireCount <= maxCount) {
addPass(aquireCount);
} else {
// Current limiting
}
Copy the code
Since there is no concurrent control, a concurrent scenario would occur in which multiple threads meet passQps + aquireCount <= maxCount and then increment the traffic count. In this case, there is no guarantee that QPS will be matched to maxCount. In concurrent cases, the actual traffic exceeds the preset QPS.
This is certainly not a Bug. The absence of concurrency control here may be for performance reasons, a compromise where performance and accuracy are acceptable
Therefore, if the actual QPS is higher than the preset value when used, it may be caused by concurrency
demo
-
Single-node current limiting: github.com/TavenYin/ta…
-
Cluster traffic limiting: github.com/TavenYin/ta…