The fast failure and uniform queuing leaky bucket algorithm of Sentinel are summarized today. Because it WarmUpController and WarmUpRateLimiterController corresponding mathematical calculation principle of the token bucket algorithm is a little more complicated, so I prepared separately with a summary in the rear. So the main thing I’m going to cover today is DefaultController and RateLimiterController.

Traffic limiting policy entry

First enter the FlowRuleUtil class, method generateRater is the creation of the corresponding policy, the logic is relatively simple, the code is as follows:

private static TrafficShapingController generateRater(FlowRule rule) { if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) { switch (rule.getControlBehavior()) { case RuleConstant.CONTROL_BEHAVIOR_WARM_UP: / / return new WarmUpController WarmUp - the token bucket algorithm (rule. The getCount (), rule. GetWarmUpPeriodSec (), ColdFactorProperty. ColdFactor); case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER: / / return line - bucket algorithm new RateLimiterController (rule. GetMaxQueueingTimeMs (), rule. The getCount ()); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER: / / preheating in line at a constant speed and combined with the return of new WarmUpRateLimiterController (rule. The getCount (), rule. GetWarmUpPeriodSec (), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT: default: // Default mode or unknown mode: Default Traffic Shaping Controller(fast-reject).}} return new DefaultController(rule-getCount (), rule.getGrade()); }Copy the code

DefaultController quickly fails

The default flow control algorithm code is as follows:

@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node);  If (curCount + to tell someone a series of events > count) {if (curCount + to tell someone a series of events > count) {if (curCount + to tell someone a series of events > count) {if (curCount + to tell someone a series of events) { If (prioritized && Grade == ruleconstant.flow_grade_qps) {long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); // If the strategy is QPS, then for the priority request try to occupy the token waitInMs = node.tryOccupyNext(currentTime, acquipynext, count) in the next time window; if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount);  node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }Copy the code

Take a look at the avgUsedTokens method involved:

private int avgUsedTokens(Node node) { if (node == null) { return DEFAULT_AVG_USED_TOKENS; Return grade == RuleConstant.FLOW_GRADE_THREAD? node.curThreadNum() : (int)(node.passQps()); }Copy the code

Returns the number of threads stored in the current statistics node if the threshold type is set to threads, and the number of QPS that have passed if the threshold type is set to QPS.

Then go back to the canPass method above. Its main logic is to add the number of occupied tokens to the number of requested tokens after obtaining the statistics of the current node. If the number is less than the set threshold, then directly release.

If the value is greater than the set threshold, special processing is performed if the threshold type is QPS and the first received request is allowed to be processed first. Otherwise, false is returned.

The above special processing is: first try to occupy the token of the later time window, get the waiting time, if the waiting time is less than the set maximum waiting time, then wait, when waiting to return to the specified time. Otherwise, false is returned.

As can be seen from the code, after waiting for a specified period, PriorityWaitException is thrown for release. The corresponding implementation is in StatisticSlot, and the corresponding entry method code is as follows:

@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. fireEntry(context, resourceWrapper, node, count, prioritized, args);  } catch (PriorityWaitException ex) {node.increasethreadnum (); if (context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); } for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); }} catch (BlockException e) {// } catch (Throwable e) { context.getCurEntry().setError(e); throw e; }}Copy the code

After removing the extra code from this method, you can see that there is no further throw in the code captured by the PriorityWaitException exception, so the request is allowed.

Constant queue-leaky bucket algorithm RateLimiterController

For the leaky bucket algorithm, first steal a picture on the Internet as follows:

The idea is: the water (request) first into the leakage bucket, leakage bucket at a certain rate of uniform flow, when the inflow is too large, excess water (request) directly overflow, so as to achieve the protection of the system capacity.

The flow shaping effect of Sentinel using leaky bucket algorithm is shown in the figure below:

Look at the canPass method of RateLimiterController:

@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { if (acquireCount <= 0) { return true; } if (count <= 0) { return false; } long currentTime = TimeUtil.currentTimeMillis(); // Calculate the token issue time, where: (1.0 / count * 1000) represents the time taken to generate each token, Long costTime = math. round(1.0 * (to tell someone a story)/count * 1000); Long expectedTime = costTime + latestPassedTime.get(); ExpectedTime <= currentTime) {if (expectedTime <= currentTime) {// if the expectedTime is smaller than the currentTime, the current token is sufficient and the currentTime is set to latestpassedtime.set (currentTime); return true; } else {/ / when the expected time greater than the current time, it means that the token is not enough, need to wait for long waitTime = costTime + latestPassedTime. The get () - TimeUtil. CurrentTimeMillis (); If (waitTime > maxQueueingTimeMs) {return false if (waitTime > maxQueueingTimeMs) {return false if (waitTime > maxQueueingTimeMs) {return false if (waitTime > maxQueueingTimeMs) {return false if (waitTime > maxQueueingTimeMs) {return false if (waitTime > maxQueueingTimeMs); } else { long oldTime = latestPassedTime.addAndGet(costTime); Try {/ / check again the waiting time waitTime = oldTime - TimeUtil. CurrentTimeMillis (); if (waitTime > maxQueueingTimeMs) { latestPassedTime.addAndGet(-costTime); return false; } // in race condition waitTime may <= 0 if (waitTime > 0) { Thread.sleep(waitTime); } return true; } catch (InterruptedException e) { } } } return false; }Copy the code

Both the Sentinel token bucket algorithm and the leaky bucket algorithm refer to the design of Guava RateLimiter.

If the number of current token requests is recounted, multiply the token generation rate to tell someone the token generation time required for a series of events and then add the last pass time to get the expected pass time for this request. If the expected pass time is smaller than the current pass time, the capacity to pass through is sufficient. If the expected pass time is longer than the current time, it indicates that the system capacity is insufficient and needs to wait. Then, determine whether to continue waiting or give up the waiting time based on the set waiting time.

It is important to note that uniform mode is limited in that it only supports QPS up to 1000. We can look at the corresponding statement:

Long costTime = math. round(1.0 * (to tell someone a story)/count * 1000); long expectedTime = costTime + latestPassedTime.get();Copy the code

It is easy to get the token generation time (in milliseconds) for each threshold:

Therefore, when the threshold count is greater than 2000, the time interval for each token generation is calculated as 0, and the subsequent judgment is meaningless. So Sentinel’s uniform only supports requests with QPS up to 1000.

Author: Mr. White oh

Link: www.cnblogs.com/mrxiaobai-w…