Wechat public account: the landlord’s small black road is far away, the future is better learning is boundless, we come on together!

Sentinel has an important function, namely, real-time data statistical analysis. We can obtain the request number, block number or response time of a certain resource in each context invocation link per second or per minute. You can also get the global number of requests, blocks, or response times for a resource. The primary implementation logic is in StatisticSlot.

Statisticslot Is the third slot in the SLOtchain and is responsible for the real-time status of statistics resources. When a slot in the Slotchain is invoked, the entry method of the slot is triggered.

@Override

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {

    try {

        // Triggers the entry method for the next Slot

        fireEntry(context, resourceWrapper, node, count, args);

        // If the Slot entry method in SlotChain can be passed, traffic is not restricted or degraded

        // Statistics

        node.increaseThreadNum();

        node.addPassRequest();

        // Omit some code

    } catch (BlockException e) {

        context.getCurEntry().setError(e);

        // Add block count.

        node.increaseBlockedQps();

        // Omit some code

        throw e;

    } catch (Throwable e) {

        context.getCurEntry().setError(e);

        // Should not happen

        node.increaseExceptionQps();

        // Omit some code

        throw e;

    }

}

Copy the code

Entry () is divided into three parts: 1. It generates entry methods for SystemSlot, FlowSlot, and Survival of eslot. 2) If the subsequent slot passes without throwing a BlockException exception, the resource is successfully invoked. In this case, the number of threads of execution and the number of requests that pass are increased. 3) If one of the following slots fails, a BlockException is thrown. If a BlockException is caught, the number of blocks increases. If the system is abnormal, increase the number of exceptions.

The exit() method is called when exiting:

 public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {

        DefaultNode node = (DefaultNode)context.getCurNode();

        if (context.getCurEntry().getError() == null) {

            // Calculate the response time using the current time -CurEntry creation time in milliseconds

            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();

            if (rt > Constants.TIME_DROP_VALVE) {

                rt = Constants.TIME_DROP_VALVE;

            }

            // Add response time and successes

            node.addRtAndSuccess(rt, count);

            if(context.getCurEntry().getOriginNode() ! =null) {

                context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);

            }

            // The number of threads is reduced by 1

            node.decreaseThreadNum();

            if(context.getCurEntry().getOriginNode() ! =null) {

                context.getCurEntry().getOriginNode().decreaseThreadNum();

            }

            // Global thread count -1

            if (resourceWrapper.getType() == EntryType.IN) {

                Constants.ENTRY_NODE.addRtAndSuccess(rt, count);

                Constants.ENTRY_NODE.decreaseThreadNum();

            }

        } else {

            // Error may happen.

        }

*** Other logic ***

        fireExit(context, resourceWrapper, count);

    }

Copy the code

When exiting, focus on the response time, collect the response time in Node, and reduce the current number of active threads by one.

The overall process is described above, but the specific operation is not clear to us. Next, I will analyze how the Qps number is counted.

In the entry() method above, Node.addPassRequest () is called when counting Qps; Methods.

@Override

public void addPassRequest(int count) {

# DefaultNode type

Count the real-time metrics of a resource in a context

     super.addPassRequest(count);

# ClusterNode type

# count the total of real-time metrics for a resource in all contexts

     this.clusterNode.addPassRequest(count);

}

Copy the code

Both nodes are subclasses of StatisticNode, and eventually call methods in StatisticNode.

@Override

public void addPassRequest(int count) {

# second level statistics

     rollingCounterInSecond.addPass(count);

# minute statistics

     rollingCounterInMinute.addPass(count);

}

Copy the code

The underlying principles of second-level statistics and minute-level statistics are the same. The following sections will analyze second-level statistics.


Correction, actual it is in the second level statistics OccupiableBucketLeapArray rather than BucketLeapArray in order to better understand, assumes that the second level with BucketLeapArray, but in fact not for minutes class, a 60 window, each window is 1 s


public class StatisticNode implements Node {

    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);

    private transient Metric rollingCounterInMinute = new ArrayMetric(60.60 * 1000.false);

Copy the code
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    

    public ArrayMetric(int sampleCount, int intervalInMs) {

        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

    }

    

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {

        if (enableOccupy) {

            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

        } else {

            this.data = new BucketLeapArray(sampleCount, intervalInMs);

        }

    }

    

    @Override

    public void addPass(int count) {

           WindowWrap<MetricBucket> wrap = data.currentWindow();

           wrap.value().addPass(count);

    }

Copy the code

In the code above, there are several important classes. ArrayMetric, BucketLeapArray, MetricBucket, WindowWrap.

WindowWrap

The data structure T inside each sliding window wrapper class is represented by MetricBucket.

public class WindowWrap<T{

    // The length of a window in milliseconds

    private final long windowLengthInMs;

    // Window start timestamp (in milliseconds)

    private long windowStart;

    // MetricBucket

    private T value;

Copy the code

MetricBucket

Represents index data over a period of time, stored in an array of type LongAdder. There are number of passes, number of blocks, number of exceptions, number of successes, response time, and passed future quota. Compared to AtomicLong, LongAddr has better throughput at high concurrency at the cost of more space.

public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

 public long get(MetricEvent event) {

        return counters[event.ordinal()].sum();

    }

}



public enum MetricEvent {

    PASS,

    BLOCK,

    EXCEPTION,

    SUCCESS,

    RT,

    OCCUPIED_PASS

}

Copy the code

LeapArray

Basic data structure of statistical indexes in Sentinel.

public LeapArray(int sampleCount, int intervalInMs) {

# Length of time window

    this.windowLengthInMs = intervalInMs / sampleCount;

# Time interval in milliseconds,

    this.intervalInMs = intervalInMs;

Number of sampling Windows, i.e. array length

    this.sampleCount = sampleCount;

    this.array = new AtomicReferenceArray<>(sampleCount);

}

Copy the code

When counting by seconds, the default time window array length is 2, and each time window length is 500ms.

The first step in counting QPS is to call data.currentwindow () to get the current time window.

public WindowWrap<T> currentWindow(a) {

        return currentWindow(TimeUtil.currentTimeMillis());

}

Copy the code

Qps added first step

The currentTimeMills() method is broken down.

public WindowWrap<T> currentWindow(long timeMillis) {

        if (timeMillis < 0) {

            return null;

        }

Calculates the subscript of the given time map in the array (default array length is2)

# then idx can be0or1

        int idx = calculateTimeIdx(timeMillis);

# calculate the start time of the window based on the current time

        long windowStart = calculateWindowStart(timeMillis);

Copy the code
private int calculateTimeIdx(long timeMillis) {

        long timeId = timeMillis / windowLengthInMs;

        return (int)(timeId % array.length());

}

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {

        return timeMillis - timeMillis % windowLengthInMs;

}

Copy the code

The reason for using two sampling Windows by default is because Sentinel is set to be a lightweight frame. The time window stores a lot of statistical data. If the time window is too large, on the one hand, it will occupy too much memory; on the other hand, too many time Windows means that the length of the time window will become smaller. If the length of the time window becomes smaller, it will lead to too frequent sliding of the time window.

while (true) {

Get the old time window stored under this index location

      WindowWrap<T> old = array.get(idx);

      if (old == null) {

If not, create one

          WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

Configure the CAS server

          if (array.compareAndSet(idx, null, window)) {

                 return window;

           } else {

                // Otherwise, the current thread gives up the time slice, and then the thread contention

                Thread.yield();

           }

# if the actual start time of the window is the same as the start time of the original window, it is not invalid

     } else if (windowStart == old.windowStart()) {

            return old;

# set the start time of the old window to longer than the start time of the old window

     } else if (windowStart > old.windowStart()) {

            if (updateLock.tryLock()) {

               try {

# change the start time of the old time window to the actual start time.

# and reset the statistics of the window to0

                    return resetWindowTo(old, windowStart);

               } finally {

                   updateLock.unlock();

               }

            }  else {

                 Thread.yield();

                }

# This condition cannot exist and an exception will be thrown

    } else if (windowStart < old.windowStart()) {

                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

           }

}

Copy the code
@Override

protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {

        // Update the start time and reset value.

        w.resetTo(startTime);

MetricBucket # w.v alue ()

        w.value().reset();

        return w;

}

Reset its start time

public WindowWrap<T> resetTo(long startTime) {

        this.windowStart = startTime;

        return this;



Reset all MetricBucket statistics to0

public void reset(a) {

        internalReset(0L);

}

Copy the code

Qps adds a second step

Wrap. Value ().addpass (count) The first step is the WindowWrap, then the MetricBucket, which adds atoms to the event window.

private T value;

public WindowWrap(long windowLengthInMs, long windowStart, T value) {

        this.windowLengthInMs = windowLengthInMs;

        this.windowStart = windowStart;

        this.value = value;

}

public T value(a) {

        return value;

}



public void addPass(int n) {

        add(MetricEvent.PASS, n);

}

Copy the code
public MetricBucket add(MetricEvent event, long n) {

        counters[event.ordinal()].add(n);

        return this;

}

Copy the code

So that’s the whole process of adding Qps.

Qps data acquisition

So we added the data, so how do we query it?

DefaultNode
ClsterNode
StatisticNode
StatisticNode
NOde

@Override

public double passQps(a) {

Get the Qps amount of the current time window array @(1)

# then get the time @(2)

        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();

}

Copy the code

Code @(1) parsing

@Override

public long pass(a) {

# Same as the previous method, filter out expired Windows

        data.currentWindow();

        long pass = 0;

        List<MetricBucket> list = data.values();



        for (MetricBucket window : list) {

            pass += window.pass();

        }

        return pass;

}



public List<T> values(a) {

        return values(TimeUtil.currentTimeMillis());

}



public List<T> values(long timeMillis) {

        if (timeMillis < 0) {

            return new ArrayList<T>();

        }

        int size = array.length();

        List<T> result = new ArrayList<T>(size);



        for (int i = 0; i < size; i++) {

            WindowWrap<T> windowWrap = array.get(i);

            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {

                continue;

            }

# the MetricBucket

            result.add(windowWrap.value());

        }

        return result;

    }

Copy the code

If the current time minus the start time of a window exceeds the event interval (1s in seconds), the window expires and is not added.

public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {

        return time - windowWrap.windowStart() > intervalInMs;

}

Copy the code

Code @(2) parsing

Since the previous time was in milliseconds, now it’s in seconds, so it’s converted to seconds.

@Override

public double getWindowIntervalInSec(a) {

        return data.getIntervalInSecond();

}



public double getIntervalInSecond(a) {

        return intervalInMs / 1000.0;

}

Copy the code

At this point, on the real-time statistics module is finished, most of the reference to several great god’s article, illustrated and well understood, we can read as follows:

Sentinel principle – Sliding window Alibaba Seninel sliding window implementation principle (with the end of the principle diagram) source analysis Sentinel real-time data acquisition principle