Wechat public account: the landlord’s small black road is far away, the future is better learning is boundless, we come on together!
Sentinel has an important function, namely, real-time data statistical analysis. We can obtain the request number, block number or response time of a certain resource in each context invocation link per second or per minute. You can also get the global number of requests, blocks, or response times for a resource. The primary implementation logic is in StatisticSlot.
Statisticslot Is the third slot in the SLOtchain and is responsible for the real-time status of statistics resources. When a slot in the Slotchain is invoked, the entry method of the slot is triggered.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// Triggers the entry method for the next Slot
fireEntry(context, resourceWrapper, node, count, args);
// If the Slot entry method in SlotChain can be passed, traffic is not restricted or degraded
// Statistics
node.increaseThreadNum();
node.addPassRequest();
// Omit some code
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
// Omit some code
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
// Omit some code
throw e;
}
}
Copy the code
Entry () is divided into three parts: 1. It generates entry methods for SystemSlot, FlowSlot, and Survival of eslot. 2) If the subsequent slot passes without throwing a BlockException exception, the resource is successfully invoked. In this case, the number of threads of execution and the number of requests that pass are increased. 3) If one of the following slots fails, a BlockException is thrown. If a BlockException is caught, the number of blocks increases. If the system is abnormal, increase the number of exceptions.
The exit() method is called when exiting:
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
// Calculate the response time using the current time -CurEntry creation time in milliseconds
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Add response time and successes
node.addRtAndSuccess(rt, count);
if(context.getCurEntry().getOriginNode() ! =null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
// The number of threads is reduced by 1
node.decreaseThreadNum();
if(context.getCurEntry().getOriginNode() ! =null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
// Global thread count -1
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
*** Other logic ***
fireExit(context, resourceWrapper, count);
}
Copy the code
When exiting, focus on the response time, collect the response time in Node, and reduce the current number of active threads by one.
The overall process is described above, but the specific operation is not clear to us. Next, I will analyze how the Qps number is counted.
In the entry() method above, Node.addPassRequest () is called when counting Qps; Methods.
@Override
public void addPassRequest(int count) {
# DefaultNode type
Count the real-time metrics of a resource in a context
super.addPassRequest(count);
# ClusterNode type
# count the total of real-time metrics for a resource in all contexts
this.clusterNode.addPassRequest(count);
}
Copy the code
Both nodes are subclasses of StatisticNode, and eventually call methods in StatisticNode.
@Override
public void addPassRequest(int count) {
# second level statistics
rollingCounterInSecond.addPass(count);
# minute statistics
rollingCounterInMinute.addPass(count);
}
Copy the code
The underlying principles of second-level statistics and minute-level statistics are the same. The following sections will analyze second-level statistics.
Correction, actual it is in the second level statistics OccupiableBucketLeapArray rather than BucketLeapArray in order to better understand, assumes that the second level with BucketLeapArray, but in fact not for minutes class, a 60 window, each window is 1 s
public class StatisticNode implements Node {
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60.60 * 1000.false);
Copy the code
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
Copy the code
In the code above, there are several important classes. ArrayMetric, BucketLeapArray, MetricBucket, WindowWrap.
WindowWrap
The data structure T inside each sliding window wrapper class is represented by MetricBucket.
public class WindowWrap<T> {
// The length of a window in milliseconds
private final long windowLengthInMs;
// Window start timestamp (in milliseconds)
private long windowStart;
// MetricBucket
private T value;
Copy the code
MetricBucket
Represents index data over a period of time, stored in an array of type LongAdder. There are number of passes, number of blocks, number of exceptions, number of successes, response time, and passed future quota. Compared to AtomicLong, LongAddr has better throughput at high concurrency at the cost of more space.
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
}
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
Copy the code
LeapArray
Basic data structure of statistical indexes in Sentinel.
public LeapArray(int sampleCount, int intervalInMs) {
# Length of time window
this.windowLengthInMs = intervalInMs / sampleCount;
# Time interval in milliseconds,
this.intervalInMs = intervalInMs;
Number of sampling Windows, i.e. array length
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
Copy the code
When counting by seconds, the default time window array length is 2, and each time window length is 500ms.
The first step in counting QPS is to call data.currentwindow () to get the current time window.
public WindowWrap<T> currentWindow(a) {
return currentWindow(TimeUtil.currentTimeMillis());
}
Copy the code
Qps added first step
The currentTimeMills() method is broken down.
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
Calculates the subscript of the given time map in the array (default array length is2)
# then idx can be0or1
int idx = calculateTimeIdx(timeMillis);
# calculate the start time of the window based on the current time
long windowStart = calculateWindowStart(timeMillis);
Copy the code
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
Copy the code
The reason for using two sampling Windows by default is because Sentinel is set to be a lightweight frame. The time window stores a lot of statistical data. If the time window is too large, on the one hand, it will occupy too much memory; on the other hand, too many time Windows means that the length of the time window will become smaller. If the length of the time window becomes smaller, it will lead to too frequent sliding of the time window.
while (true) {
Get the old time window stored under this index location
WindowWrap<T> old = array.get(idx);
if (old == null) {
If not, create one
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
Configure the CAS server
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
// Otherwise, the current thread gives up the time slice, and then the thread contention
Thread.yield();
}
# if the actual start time of the window is the same as the start time of the original window, it is not invalid
} else if (windowStart == old.windowStart()) {
return old;
# set the start time of the old window to longer than the start time of the old window
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
# change the start time of the old time window to the actual start time.
# and reset the statistics of the window to0
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
# This condition cannot exist and an exception will be thrown
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
Copy the code
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
MetricBucket # w.v alue ()
w.value().reset();
return w;
}
Reset its start time
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
Reset all MetricBucket statistics to0
public void reset(a) {
internalReset(0L);
}
Copy the code
Qps adds a second step
Wrap. Value ().addpass (count) The first step is the WindowWrap, then the MetricBucket, which adds atoms to the event window.
private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public T value(a) {
return value;
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
Copy the code
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
Copy the code
So that’s the whole process of adding Qps.
Qps data acquisition
So we added the data, so how do we query it?
DefaultNode
ClsterNode
StatisticNode
StatisticNode
NOde
@Override
public double passQps(a) {
Get the Qps amount of the current time window array @(1)
# then get the time @(2)
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
Copy the code
Code @(1) parsing
@Override
public long pass(a) {
# Same as the previous method, filter out expired Windows
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
public List<T> values(a) {
return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
# the MetricBucket
result.add(windowWrap.value());
}
return result;
}
Copy the code
If the current time minus the start time of a window exceeds the event interval (1s in seconds), the window expires and is not added.
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
Copy the code
Code @(2) parsing
Since the previous time was in milliseconds, now it’s in seconds, so it’s converted to seconds.
@Override
public double getWindowIntervalInSec(a) {
return data.getIntervalInSecond();
}
public double getIntervalInSecond(a) {
return intervalInMs / 1000.0;
}
Copy the code
At this point, on the real-time statistics module is finished, most of the reference to several great god’s article, illustrated and well understood, we can read as follows:
Sentinel principle – Sliding window Alibaba Seninel sliding window implementation principle (with the end of the principle diagram) source analysis Sentinel real-time data acquisition principle