sequence
This article focuses on the StatisticSlot of Sentinel
StatisticSlot
com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.java
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, args);
node.increaseThreadNum();
node.addPassRequest();
if(context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(); }if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest();
}
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
if(context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().increaseBlockedQps(); }if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockedQps();
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
if(context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().increaseExceptionQps(); }if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps();
}
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
node.rt(rt);
if(context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().rt(rt); } node.decreaseThreadNum();if(context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().decreaseThreadNum(); }if(resourceWrapper.getType() == EntryType.IN) { Constants.ENTRY_NODE.rt(rt); Constants.ENTRY_NODE.decreaseThreadNum(); }}else{ // error may happen // node.rt(-2); } fireExit(context, resourceWrapper, count); }}Copy the code
- Rewrite the entry and exit methods
- Entry is the method for invoking node increaseThreadNum and addPassRequest when the entry is successfully passed. If it fails, you call the increaseBlockedQps; Otherwise, you call increaseExceptionQps
- The exit method calls node’s decreaseThreadNum and finally triggers the actual exit operation
StatisticNode
com/alibaba/csp/sentinel/node/StatisticNode.java
public class StatisticNode implements Node { private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL); private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60); private AtomicInteger curThreadNum = new AtomicInteger(0); private long lastFetchTime = -1; / /... @Override public intcurThreadNum() {
return curThreadNum.get();
}
@Override
public void addPassRequest() {
rollingCounterInSecond.addPass();
rollingCounterInMinute.addPass();
}
@Override
public void increaseBlockedQps() {
rollingCounterInSecond.addBlock();
rollingCounterInMinute.addBlock();
}
@Override
public void increaseExceptionQps() {
rollingCounterInSecond.addException();
rollingCounterInMinute.addException();
}
@Override
public void increaseThreadNum() {
curThreadNum.incrementAndGet();
}
@Override
public void decreaseThreadNum() { curThreadNum.decrementAndGet(); }}Copy the code
- A StatisticNode has a curThreadNum that records the number of calling threads for each node, facilitating flow control
DefaultController
com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.java
public class DefaultController implements Controller {
double count = 0;
int grade = 0;
public DefaultController(double count, int grade) {
super();
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return- 1; }returngrade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps(); }}Copy the code
- Here, the canPass method is used to judge traffic limiting based on node statistics
- If FLOW_GRADE_THREAD is used, curThreadNum is used; otherwise, passQps is used
DefaultSlotsChainBuilder
com/alibaba/csp/sentinel/slots/DefaultSlotsChainBuilder.java
public class DefaultSlotsChainBuilder implements SlotsChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
returnchain; }}Copy the code
StatisticSlot is preceded by SystemSlot, SystemSlot, AuthoritySlot and FlowSlot, but its incremental statistics are performed after the fireEntry action, which is equivalent to after intercepted by AOP. You can also think of Node’s curThreadNum as the number of threads requested after the flow limiting condition has passed
summary
- StatisticSlot is similar to after interception and is not counted until all subsequent slot conditions have passed
- Statistics curThreadNum, rollingCounterInSecond, and rollingCounterInMinute are maintained by StatisticNode
- DefaultController controls FLOW_GRADE_THREAD based on specified flow limiting conditions by default. If FLOW_GRADE_THREAD is flow_curthreadnum, passQps is used to determine the value
doc
- DefaultController