Sentinel, a brief introduction to

Sentinel is an open source flow control component of Alibaba. With flow as the core, sentinel provides system flow control, fuse downgrade, system protection and other functions to ensure the smooth operation of the system. At present, there are two main flow control methods. One is represented by Hystrix, which is based on thread pool isolation, and the other is based on semaphore. Sentinel is used to realize flow control.

Used to introduce

This article will not go into the details of its use, but refer to Github for details

Simple use steps:

1. Introduce dependencies

< the dependency > < groupId > com. Alibaba. CSP < / groupId > < artifactId > sentinel - core < / artifactId > < version > 1.7.2 < / version > </dependency>Copy the code

2. Define resources and rules in your code

Resources: a piece of code, a method, anything in a project

Rules: Flow control rules, such as QPS, that you define for resources

public static void main(String[] args) { initFlowRules(); while (true) { Entry entry = null; try { entry = SphU.entry("HelloWorld"); /* Your business logic - start */ system.out.println ("hello world"); /* Your business logic - End */} catch (BlockException e1) {/* Flow control logic - start */ system.out.println ("block!" ); /* Finally {if (entry! = null) { entry.exit(); } } } private static void initFlowRules(){ List<FlowRule> rules = new ArrayList<>(); FlowRule rule = new FlowRule(); rule.setResource("HelloWorld"); rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // Set limit QPS to 20. rule.setCount(20); rules.add(rule); FlowRuleManager.loadRules(rules); }}Copy the code

The business code in the above code is defined as a resource named “HelloWorld” that has a QPS of 20.

The principle of analysis

As can be seen from the above code, the Entry for limiting traffic starts with an object called Entry, which as its name implies can be understood as the meaning of Entry. Through the entrance to protect our code or resources, any request to come through the entrance check, if the check is passed before the release, otherwise refuse to release. The rules of the road are left up to the developers. The following image is from the Sentinel official, which roughly describes the sentinel operation schematic. For those of you who are new to Sentinel, this might be a bit confusing, but here’s a quick overview of how it works

When we have a request coming in,EntryA processing chain is created for each resourceProcessorSlotChainBy default, the system provides eight handlers to form this processing chain. Each Handler performs its own functions to complete the corresponding functions. Of course, our flow verification processing object is also namedFlowSlotIf the request can be passed when it arrivesProcessorSlotChainIf it does not pass, an exception will be thrown. Sentinel takes flow control as the core, and the underlying flow statistics are completed by sliding window QPS statistics, which is realized by nameLeapArrayObject. The processing chain is shown in the figure below:

Core concepts and classes

Before analyzing the specific source code, first introduce a few more core concepts and objects, or into the code will be more awkward.

Entry: As mentioned above, the resources to be protected must be wrapped with Entry:

Entry entry = SphU.entry("HelloWorld"); . // Protected resource entry.exit();Copy the code

Context: Each entry has a specific Context. It is a ThreadLocal variable that uses the name field to distinguish different contexts. Each Context represents an EntranceNode.

ResourceWrapper: ResourceWrapper class, above which the sphu.entry (“HelloWorld”) statement actually creates a HelloWorld ResourceWrapper, with the resource being globally unique

public abstract class ResourceWrapper { protected final String name; // Resource name...... }Copy the code

ProcessorSlotChain: This object is the core component through which various functions such as limiting and fusing are implemented. Inside are Slot objects, each of which performs its own function. The Chain of the Slot object is built using an Spi.

# Sentinel default ProcessorSlots // Provide Node objects for each ProcessorSlotChain Com. Alibaba. CSP. Sentinel. Slots. Nodeselector. NodeSelectorSlot / / for each ProcessorSlotChain clusterNode objects and originNode Com. Alibaba. CSP. Sentinel. Slots. Clusterbuilder. ClusterBuilderSlot / / log com. Alibaba. CSP. Sentinel. Slots. The logger. LogSlot / / statistical traffic com. Alibaba. CSP. Sentinel. Slots. The statistic. StatisticSlot / / check protection system rules Com. Alibaba. CSP. Sentinel. Slots. System. SystemSlot / / certified check com. Alibaba. CSP. Sentinel. Slots. Block. The authority. AuthoritySlot / / flow control check com. Alibaba. CSP. Sentinel. Slots. Block. Flow. FlowSlot / / fusing the drop test com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlotCopy the code

In addition to the eight Slot objects provided by default, developers can also define their own Slot objects and add them in SPI mode.

Node: Used to perform statistics-related functions. NodeSelectorSlot, the first Slot object in ProcessorSlotChain, is used to create or obtain Node objects. Each subsequent Slot passes this Node object through for statistics-related functions.

Core source code analysis

With the above several core concepts, the following formal entry into the source code analysis.

1. Create an entrySphU

This class provides an API for creating an Entry. A similar class is called SphO. The SphO class rejects the request by throwing an exception, while the SphO class returns a Bool to indicate the result.

SphU

  public static Entry entry(String name) throws BlockException {
      return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
  }
Copy the code

SphO

  public static boolean entry(String name) {
      return entry(name, EntryType.OUT, 1, OBJECTS0);
  }
Copy the code

Env class SphU entry class SphU entry

Public static final Sph Sph = new CtSph(); public static final Sph = new CtSph(); Static {// If init fails, the process will exit. // Initialize the related interface of InitFunc and define initExecutor.doinit () through SPI; }}Copy the code

Enter the entry method of CtSph. In this class, there are multiple overloads of entry methods. We will analyze only one of them

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
Copy the code

2. Create conext and processorSlotChain objects

In the above method, we can see that each entry is associated with a resource, which is uniquely associated by name and type. The code then moves on to an entryWithPriority method, which is an important one. The method class creates the Context object, handles the ProcessorSlotChain object, and is the entry point for rule validation

/** * resourceWrapper: resource * count: Request permission * prioritized: priority * args: Private Entry entryWithPriority(ResourceWrapper ResourceWrapper, int count, Boolean Prioritized, Object... Args) throws BlockException {// Get the Context object, a ThreadLocal variable Context Context = contextutil.getContext (); // If the number of contexts created reaches the upper limit (2000), If (Context instanceof NullContext) {// Create an entry object that does not perform rule validation. Return new CtEntry(resourceWrapper, null, context); If (context == null) {// Create a context object, And set the default name for: sentinel_default_context context. = InternalContextUtil internalEnter (the CONTEXT_DEFAULT_NAME); } // Global control switch, if off does not perform rule check if (! Constants.ON) { return new CtEntry(resourceWrapper, null, context); ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); If (chain == null) {return new CtEntry(resourceWrapper, null, context); if (chain == null) {return CtEntry(resourceWrapper, null, context); ProcessorSlotChain ok Entry e = new CtEntry(resourceWrapper, chain, context); Chain. Entry (Context, resourceWrapper, NULL, count, prioritized, args); } catch (BlockException e1) {// E.xit (count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }Copy the code

The above code actually contains the entire flow control rule verification process. Let’s look at the creation of the Context object Context

1.ContextUtil.getContext()

private static ThreadLocal<Context> contextHolder = new ThreadLocal<>(); . public static Context getContext() { return contextHolder.get(); }Copy the code

Context is a thread-local variable that returns null the first time it is entered. A new Context object is created in the trueEnter method of the ContextUtil class

protected static Context trueEnter(String name, String origin) { Context context = contextHolder.get(); LocalCacheNameMap = contextNameNodeMap; if (context == null) {// Get the context node cache with the key being context name Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; EntranceNode DefaultNode node = localCachenamemap. get(name); if (node == null) { if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); // Node = contextNamenodemap.get (name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else {// create EntranceNode, each context has a unique entry node = new EntranceNode(new StringResourceWrapper(name, entryType.in), null);  // Mount the created entranceNode to the ROOT node; Constants.root.addChild (node); // Add the newly created node to the buffer Map<String, DefaultNode> newMap = new HashMap<>(contextNamenodemap.size () + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); }} // Create context = new context (node, name); context.setOrigin(origin); contextHolder.set(context); } return context; }Copy the code

The above code is the creation of a context, each of which is a thread-local variable and is associated with an EntranceNode that is mounted below the root node. Once the context object is created, the ProcessorSlotChain object is created, returning to the lookProcessChain method in the entryWithPriority method above

ProcessorSlot<Object> lookProcessChain(ResourceWrapper ResourceWrapper) {ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } / / create the object by means of SPI chain = SlotChainProvider. NewSlotChain (); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }Copy the code

The ProcessorSlotChain object is created via SPI, and its interface implementation class is defined in the following fileWithout going into the details, the ProcessorSlotChain object is formed by organizing the objects in the file in the convention order. When the relevant objects are created, it is the concrete rule verification. Go back to the chain.entry(Context, resourceWrapper, NULL, count, prioritized, ARgs) line in the entryWithPriority method, which is the entry point for rule validation. One object that enters the processing chain is NodeSelectorSlot

@ SpiOrder (10000) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot < Object > {/ / DefaultNode cache, Key is the name of context. Private volatile Map<String, DefaultNode> Map = new HashMap<String, DefaultNode>(10); @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { DefaultNode node = map.get(context.getName()); if (node == null) { synchronized (this) { node = map.get(context.getName()); If (node == null) {// Create node node = new DefaultNode(resourceWrapper, null); HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size()); cacheMap.putAll(map); cacheMap.put(context.getName(), node); map = cacheMap; // Build invocation tree ((DefaultNode) context.getLastNode()).addChild(node); } } } context.setCurNode(node); fireEntry(context, resourceWrapper, node, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); }}Copy the code

This code is still fairly simple, fetching a DefalutNode based on the name of the context, and creating one if it doesn’t. The context, Entry, ProcesssorSlotChain, and DefaultNode relationships are described here.

  • Context represents a context. Each entry is run in a specific context, and each context has an EntranceNode.
  • Each entry is associated with a specific resource;
  • Each resource has a ProcesssorSlotChain to verify the rules;
  • Each ProcessSlotChain can contain multiple DefaultNodes, but only one clusterNode;

The node diagram is as follows:EntranceNode1 and EntranceNode2 represent the entry nodes of the two contexts respectively. The second object in the processing chain is ClusterBuilderSlot. The function of this object is to maintain ClusterNodes and OriginNodes. The function of ClusterNode is counted for all context of resource. An originNode is used for an entry that has the Origin attribute. Take a look at the main code

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... Args) throws Throwable {if (clusterNode == NULL) {synchronized (lock) {if (clusterNode == null) {// Creates a clusterNode clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType()); HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16)); newMap.putAll(clusterNodeMap); newMap.put(node.getId(), clusterNode); clusterNodeMap = newMap; }}} //node is the entranceNode node.setClusterNode(clusterNode); // If the context sets the origin attribute if (!" ".equals(context.getOrigin())) { Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin()); context.getCurEntry().setOriginNode(originNode); } fireEntry(context, resourceWrapper, node, count, prioritized, args); }Copy the code

The next processor is the LogSlot class, whose main function is to log exceptions, which I won’t go into here. The next processor, StatisticSlot, is a very important object that maintains statistics about traffic, threads, exceptions, and so on. The code is as follows:

@Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... Args) throws Throwable {try {// fireEntry(Context, resourceWrapper, node, count, prioritized, ARgs); // Add the count of threads and passes to the increase.increasethReadnum (); node.addPassRequest(count); If (context.getCurentry ().getoriginNode ()! = null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } / / increase global count statistics the if (resourceWrapper. GetEntryType () = = EntryType. IN) {/ / Add the count for global the inbound entry node for global  statistics. Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } / / call the callback for (ProcessorSlotEntryCallback < DefaultNode > handler: StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) { node.increaseThreadNum(); if (context.getCurEntry().getOriginNode() ! = null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseThreadNum(); } // Handle pass event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (BlockException e) { // Blocked, set block exception to current entry. context.getCurEntry().setBlockError(e);  // Add block count statistics node.increaseBlockqps (count); if (context.getCurEntry().getOriginNode() ! = null) { context.getCurEntry().getOriginNode().increaseBlockQps(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics. Constants.ENTRY_NODE.increaseBlockQps(count); } // Handle block event with registered entry callback handlers. for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onBlocked(e, context, resourceWrapper, node, count, args); } throw e; } catch (Throwable e) { // Unexpected internal error, set error to current entry. context.getCurEntry().setError(e); throw e; }}Copy the code

The next SystemSlot and AuthoritySlot are not covered here. Focus on the FlowSlot class, where the entire flow control check is performed. The code is relatively simple:

  @Override
  public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args) throws Throwable {
      checkFlow(resourceWrapper, context, node, count, prioritized);

      fireEntry(context, resourceWrapper, node, count, prioritized, args);
  }
Copy the code

FlowRuleChecker’s checkFlow method is called:

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; Collection<FlowRule> rules = ruleprovider.apply (resource-.getName ()); if (rules ! = null) { for (FlowRule rule : rules) { if (! canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); }}}}Copy the code

Go to the canPassCheck method,

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } // In cluster mode, If (rule-isClusterMode ()) {return passClusterCheck(rule, context, node, acquiretized, prioritized); } return passLocalCheck(rule, context, node, acquireevent, prioritized); } private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, Boolean prioritized) {// Select the appropriate node, According to the statistical techniques to determine whether the Node through selectedNode = selectNodeByRequesterAndStrategy (rule, the context Node); if (selectedNode == null) { return true; } return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }Copy the code

Finally, the canPass method of the traffic shaping controller is called. Here is the implementation of DefaultController, the default traffic shaping controller

@Override public boolean canPass(Node node, int acquireCount, Int curCount = avgUsedTokens(node); If (curCount + acquiretized > count) {if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); WaitInMs = node.tryOccupyNext(currentTime, acquipyNext, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount);  node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }Copy the code

The survival of fusible degradations is summarized in a follow-up article. The principle and main workflow of Sentinel current limiting are analyzed. At its heart is ProcessorSlotChain, which captures its main thread.