The background,

1. Application scenarios

This section describes the application scenarios of cluster flow control.

Why cluster flow control? Suppose we want to limit the total QPS for calling an API to a user to 50, but the number of machines may be large (say, 100). At this point, it is natural to think of a server to count the total number of calls, and other instances to communicate with this server to determine whether they can be called. This is the most basic cluster flow control approach.

In addition, cluster flow control can also solve the problem of poor overall flow limiting effect caused by uneven flow. Suppose there are 10 machines in the cluster, and we set the single-machine traffic limiting threshold to 10 QPS for each machine. Ideally, the traffic limiting threshold for the whole cluster is 100 QPS. However, in practice, the flow to each machine may be uneven, resulting in some machines starting to limit the flow when the total amount is not reached. Therefore, it is impossible to limit the total flow accurately only by the single machine dimension. Cluster flow control can accurately control the total number of calls in the whole cluster, combined with the single-node flow limiting pocket, can better play the effect of flow control.

Cluster flow control has two identities:

  • Token Client: a cluster flow control Client that communicates with the owning Token Server to request a Token. The cluster traffic limiting server returns the result to the client to determine whether to limit traffic.
  • Token Server: the flow control Server of the cluster processes the requests from the Token Client and determines whether to issue the Token based on the configured cluster rules.

2. Use mode

With reference to github.com/alibaba/Sen… .

3. About the console

In addition, officials mentioned that the use of the console to control cluster flow control, need to do a secondary development of the console.

TokenServer startup mode

TokenServer can be started in two ways: embedded TokenServer and independent TokenServere.

1. Embed TokenServer

The most important thing about embedding TokenServer is that each node is a common application instance (peer), which is either TokenServer or, in the future, TokenClient.

The case for embedding TokenServer is provided, where Sentinel’s InitFunc extension point is used to perform all InitFunc init initialization methods when Sentinel’s Env class loads initialization.

public class DemoClusterInitFunc implements InitFunc { @Override public void init() throws Exception { // 1. Load the FlowRule flow control rule to FlowRuleManager initDynamicRuleProperty(); // 2. [ClusterClientConfig] Load TokenClient general configuration, such as TokenServer timeout initClientConfigProperty(); / / 3. [ClusterGroupEntity] loading TokenClient distribution configuration, which machine is TokenClient, which machine is TokenServer initClientServerAssignProperty (); / / 4. Load to ClusterFlowRuleManager ClusterFlowRule rules, this is a peculiar derivative rules registerClusterRuleSupplier cluster flow control (); / / 5. [ServerTransportConfig] loading TokenServer communication configuration, such as port initServerTransportConfigProperty (); // 6. [ClusterGroupEntity] Loads the current embedded TokenServer state (TokenClient or TokenServer or not started) initStateProperty(); }}Copy the code

To keep the embedded TokenServer working, you need to load these configurations (demo all integrates Nacos as a dynamic data source) :

  1. Common flow control rules include flow control rules and hotspot parameter flow control rules. For example, the clusterMode attribute of FlowRule is true.

    // DemoClusterInitFunc private void initDynamicRuleProperty() { ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(remoteAddress, groupId, flowDataId, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); FlowRuleManager.register2Property(ruleSource.getProperty()); } public class FlowRule extends AbstractRule {private Boolean clusterMode; Private ClusterFlowConfig clusterConfig; }Copy the code
  2. TokenClient side configuration: because of embedded TokenServer each node peer, may become TokenClient;

    1) initClientConfigProperty loads the general configuration of Client. Currently, ClusterClientConfig only contains the timeout time of request TokenServer;

    // DemoClusterInitFunc
    private void initClientConfigProperty() {
           ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(remoteAddress, groupId,
               configDataId, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {}));
           ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());
       }
    
    public class ClusterClientConfig {
       private Integer requestTimeout;
    }
    Copy the code

    2) initClientServerAssignProperty, loading TokenClient distribution configuration, which machine is TokenClient, which machine is TokenServer. Currently, only the IP address and port of the TokenServer in the cluster where the node resides are saved.

    // DemoClusterInitFunc
    private void initClientServerAssignProperty() {
       ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(remoteAddress, groupId,
           clusterMapDataId, source -> {
           List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {});
           return Optional.ofNullable(groupList)
               .flatMap(this::extractClientAssignment)
               .orElse(null);
       });
       ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());
    }
    public class ClusterClientAssignConfig {
           // TokenServer ip
       private String serverHost;
       // TokenServer port
       private Integer serverPort;
    }
    Copy the code
  3. TokenServer configuration:

    1) Network communication configuration, currently there is only one TokenServer startup port configuration;

    // DemoClusterInitFunc private void initServerTransportConfigProperty() { ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new NacosDataSource<>(remoteAddress, groupId, clusterMapDataId, source -> { List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {}); return Optional.ofNullable(groupList) .flatMap(this::extractServerTransportConfig) .orElse(null); }); ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty()); } public class ServerTransportConfig {// TokenServer private int port; }Copy the code

    2) Cluster flow control rule configuration: If cluster flow control is used, TokenServer must load the cluster flow control rule; The data source is flow control rule FlowRule, but ClusterFlowRuleManager has converted FlowRule internally. We will see later.

    // DemoClusterInitFunc private void registerClusterRuleSupplier() { ClusterFlowRuleManager.setPropertySupplier(namespace  -> { ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId, namespace + DemoConstants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); return ds.getProperty(); }); }Copy the code
  4. Current node state configuration: Set whether the current node is TokenServer or TokenClient. The logic of parsing the Nacos configuration model into an Integer is left out, because the data source could be another data source, such as SentinelDashboard;

    // DemoClusterInitFunc private void initStateProperty() { ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(remoteAddress, groupId, clusterMapDataId, source -> { List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {}); return Optional.ofNullable(groupList) .map(this::extractMode) .orElse(ClusterStateManager.CLUSTER_NOT_STARTED); }); ClusterStateManager.registerProperty(clusterModeDs.getProperty()); } private int extractMode(List<ClusterGroupEntity> groupList) { // 1. If (grouplist.stream ().anymatch (this::machineEqual)) {return ClusterStateManager.CLUSTER_SERVER; } // 2. Determine whether the current node is a node in a cluster, if so, become TokenClient, Boolean canBeClient = grouplist.stream ().flatmap (e -> LLDB etClientSet().stream()).filter(Objects::nonNull) .anyMatch(e -> e.equals(getCurrentMachineId())); return canBeClient ? ClusterStateManager.CLUSTER_CLIENT : ClusterStateManager.CLUSTER_NOT_STARTED; }Copy the code

2. Independent TokenServer

An independent TokenServer means that a separate process is started to issue tokens to each application instance.

Here’s the official demo:

Public Class ClusterServerDemo {public static void main(String[] args) throws Exception {// Create TokenServer ClusterTokenServer tokenServer = new SentinelDefaultTokenServer(); / / load TokenServer network communication configuration ClusterServerConfigManager. LoadGlobalTransportConfig (new ServerTransportConfig () .setIdleSeconds(600) .setPort(11111)); // Load NameSpace -- separate clusters ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(DemoConstants.APP_NAME)); // TokenServer tokenserver.start (); } } public class DemoClusterServerInitFunc implements InitFunc { private final String remoteAddress = "localhost:8848"; private final String groupId = "SENTINEL_GROUP"; private final String namespaceSetDataId = "cluster-server-namespace-set"; private final String serverTransportDataId = "cluster-server-transport-config"; @ Override public void init () throws the Exception {/ / dynamic cluster flow control rule configuration ClusterFlowRuleManager. SetPropertySupplier (namespace - > { ReadableDataSource<String, List<FlowRule>> ds = new NacosDataSource<>(remoteAddress, groupId, namespace + DemoConstants.FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); return ds.getProperty(); }); ReadableDataSource<String, Set<String>> namespaceDs = new NacosDataSource<>(remoteAddress, groupId, namespaceSetDataId, source -> JSON.parseObject(source, new TypeReference<Set<String>>() {})); ClusterServerConfigManager.registerNamespaceSetProperty(namespaceDs.getProperty()); ReadableDataSource<String, ServerTransportConfig> transportConfigDs = new NacosDataSource<>(remoteAddress, groupId, serverTransportDataId, source -> JSON.parseObject(source, new TypeReference<ServerTransportConfig>() {})); ClusterServerConfigManager.registerServerTransportProperty(transportConfigDs.getProperty()); }}Copy the code

Standalone TokenServer needs to load the following configuration:

  1. Cluster flow control rules: with embedded TokenServer, single machine flow control rules are generally used for conversion;
  2. Namespace configuration: An independent TokenServer can issue tokens to multiple clusters. Different clusters are distinguished by namespace.
  3. Network communication configuration: with embedded TokenServer;

Third, ClusterFlowConfig

To configure the flow control rule for a cluster, you need to enable clusterMode in FlowRule. If clusterMode is true, cluster flow control is enabled. Many features of the local flow control rule are invalid.

Public class FlowRule extends AbstractRule {// Class FlowRule extends AbstractRule {// Class FlowRule extends AbstractRule; Only QPS private int grade = ruleconstant. FLOW_GRADE_QPS; Private double count; Private int strategy = ruleconstant. STRATEGY_DIRECT; private int strategy = ruleconstant. STRATEGY_DIRECT; Private String refResource; // Flow control effect 0- Fast failure 1-Warm Up 2- Queue Waiting - Invalid private int controlBehavior = Ruleconstant. CONTROL_BEHAVIOR_DEFAULT; // PeriodPeriodSec (s) -- Invalid private int warmUpPeriodSec = 10; Private int maxQueueingTimeMs = 500; private int maxQueueingTimeMs = 500; Private Boolean clusterMode; Private ClusterFlowConfig clusterConfig; // TrafficShapingController -- invalid private TrafficShapingController; }Copy the code

Configure ClusterFlowConfig for cluster flow control rules. Only some configurations take effect.

Public Class ClusterFlowConfig {// Global unique ID. Within a cluster, a flow control rule in the same cluster has one ID. Private Long flowId; // Cluster threshold mode: 0- Evenly distributed single-node cluster 1- Total threshold Private int thresholdType = ClusterRuleconstant. FLOW_THRESHOLD_AVG_LOCAL; True private Boolean fallbackToLocalWhenFail = true; fallbackToLocalWhenFail = true; Private int strategy = ClusterRuleconstant. FLOW_CLUSTER_STRATEGY_NORMAL; / / sampling quantity, the default 10 private int sampleCount = ClusterRuleConstant. DEFAULT_CLUSTER_SAMPLE_COUNT; Private int windowIntervalMs = ruleconstant.default_window_interval_ms; // Window size 1s private int windowIntervalMs = ruleconstant.default_window_interval_ms; Private Long resourceTimeout = 2000; private long resourceTimeout = 2000; // After holding token expires, 0- Ignore 1- Release default 0- Invalid private int resourceTimeoutStrategy = ruleconstant. DEFAULT_RESOURCE_TIMEOUT_STRATEGY; Private int acquireRefuseStrategy = ruleconstant.default_block_strategy; Private Long clientOfflineTime = 2000; private long clientOfflineTime = 2000; private long clientOfflineTime = 2000; }Copy the code

Four, FlowSlot

We have seen the local rule verification logic for flow control rule FlowSlot. Here, we will look at the difference between cluster flow control and local flow control.

// FlowRuleChecker.java public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } if (rule-isClusterMode ()) {return passClusterCheck(rule, context, node, acquiretized, prioritized); } return passLocalCheck(rule, context, node, acquireevent, prioritized); }Copy the code

The client flow of cluster flow control is very simple:

  1. Obtain the TokenService of the Token provisioning service.
  2. Call TokenService to get Token;
  3. Process the result of obtaining Token;
// FlowRuleChecker.java private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, Int show real events, Boolean priorities) {try {// select TokenService clusterService = pickClusterService(); if (clusterService == null) { return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized); } // Get token long flowId = rule-getClusterConfig ().getFlowId(); TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized); Return applyTokenResult(result, rule, context, node, acquireevent, prioritized); } catch (Throwable ex) { RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex); } return fallbackToLocalOrPass(rule, context, node, acquireevent, prioritized); }Copy the code

TokenService generally exposes one method, which is to obtain tokens.

Public interface TokenService {TokenResult requestToken(Long ruleId, int acquitoken to tell someone a series of events, boolean prioritized); Token TokenResult requestParamToken(Long ruleId, int acquiparamToken, Collection<Object> params); }Copy the code

Get TokenService

First pickClusterService gets TokenService. If the current node status is TokenClient, return ClusterTokenClient instance. If the current node status is TokenServer, return EmbeddedClusterTokenServer instances, the only embedded TokenServer will have this kind of situation.

// FlowRuleChecker.java
private static TokenService pickClusterService() {
    if (ClusterStateManager.isClient()) {
        return TokenClientProvider.getClient();
    }
    if (ClusterStateManager.isServer()) {
        return EmbeddedClusterTokenServerProvider.getServer();
    }
    return null;
}
Copy the code

Access Token

The second step, TokenService requestToken access token.

If the current node is TokenClient, cluster assembly flow control rule id (FlowRule. ClusterFlowConfig. FlowId), need to get number token acquireCount, whether priority prioritized, request TokenServer.

// DefaultClusterTokenClient.java @Override public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) { if (notValidRequest(flowId, acquireCount)) { return badRequest(); } FlowRequestData data = new FlowRequestData().setCount(acquireCount) .setFlowId(flowId).setPriority(prioritized); ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data); try { TokenResult result = sendTokenRequest(request); logForResult(result); return result; } catch (Exception ex) { ClusterClientStatLogUtil.log(ex.getMessage()); return new TokenResult(TokenResultStatus.FAIL); }}Copy the code

If the current node is embedded TokenServer itself, which is a TokenServer, entrust DefaultTokenService DefaultEmbeddedTokenServer instance access Token;

In addition, if the current node is an independent TokenServer and receives FlowRequestData from TokenClient, it will also go to DefaultTokenService to obtain the Token.

public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer {

    private final TokenService tokenService = TokenServiceProvider.getService();

    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        if (tokenService != null) {
            return tokenService.requestToken(ruleId, acquireCount, prioritized);
        }
        return new TokenResult(TokenResultStatus.FAIL);
    }
}
Copy the code

The requestToken method of DefaultTokenService is the core logic of cluster flow control.

If the request parameter is invalid, return BAD_REQUEST;

Obtain the flow control rule based on the cluster flow control rule ID. If no flow control rule is found, NO_RULE_EXISTS is returned.

@Spi(isDefault = true)
public class DefaultTokenService implements TokenService {

    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest(); // BAD_REQUEST
        }
        // The rule should be valid.
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }
        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }
Copy the code

If the flow control rule exists, enter the acquireClusterToken method of DefaultTokenService.

// DefaultTokenService.java static TokenResult acquireClusterToken(FlowRule rule, int acquireCount, boolean prioritized) { Long id = rule.getClusterConfig().getFlowId(); // a global leak bucket controls the namespace level request TokenServer QPS limit, default 30,000 QPS if (! allowProceed(id)) { return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST); } / / cluster traffic statistical indicators ClusterMetric metric. = ClusterMetricStatistics getMetric (id); if (metric == null) { return new TokenResult(TokenResultStatus.FAIL); } // qps double latestQps = metric.getAvg(ClusterFlowEvent.PASS); / / * = configuration threshold value threshold coefficient of double globalThreshold = calcGlobalThreshold (rule) * ClusterServerConfigManager getExceedCount (); Double nextRemaining = globalthreshthreshold - latestQps - to tell someone a story; If (nextRemaining >= 0) {// Count the traffic through and return OK // TODO: checking logic and metric operation should be separated. metric.add(ClusterFlowEvent.PASS, acquireCount); metric.add(ClusterFlowEvent.PASS_REQUEST, 1); if (prioritized) { metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount); } return new TokenResult(TokenResultStatus.OK) .setRemaining((int) nextRemaining) .setWaitInMs(0); } else {// If prioritized=true, try to get the token for the next window, WaitInMs if (prioritized) {double occupyAvg = metry.getavg (ClusterFlowEvent.WAITING); if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) { int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold); if (waitInMs > 0) { ClusterServerStatLogUtil.log("flow|waiting|" + id); return new TokenResult(TokenResultStatus.SHOULD_WAIT) .setRemaining(0) .setWaitInMs(waitInMs); }}} return BLOCKED metric.add(ClusterFlowEvent.BLOCK, acquireevent); metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1); ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount); ClusterServerStatLogUtil.log("flow|block_request|" + id, 1); if (prioritized) { metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount); ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1); } return blockedResult(); }}Copy the code

First, TokenServer has full restricted streams for the Token fetching interface, and if full restricted streams return TOO_MANY_REQUEST.

Locate the owning namespace according to the cluster traffic control rules. Each namespace has a traffic limiter RequestLimiter, which controls the rate at which a namespace requests TokenServer. By default, the default namespace is used and the traffic limiting threshold is 30,000.

// ClusterFlowChecker.java
static boolean allowProceed(long flowId) {
    String namespace = ClusterFlowRuleManager.getNamespace(flowId);
    return GlobalRequestLimiter.tryPass(namespace);
}
// GlobalRequestLimiter.java
 public static boolean tryPass(String namespace) {
   if (namespace == null) {
     return false;
   }
   RequestLimiter limiter = GLOBAL_QPS_LIMITER_MAP.get(namespace);
   if (limiter == null) {
     return true;
   }
   return limiter.tryPass();
 }
Copy the code

Second, calculate the number of remaining tokens = threshold – QPS – Number of required tokens. If the threshold type is the total threshold, the configured threshold is returned. If the threshold type is single-node balancing, threshold = Configured threshold * Number of clients that use the current cluster flow control rule.

// ClusterFlowChecker.java private static double calcGlobalThreshold(FlowRule rule) { double count = rule.getCount(); switch (rule.getClusterConfig().getThresholdType()) { case ClusterRuleConstant.FLOW_THRESHOLD_GLOBAL: return count; case ClusterRuleConstant.FLOW_THRESHOLD_AVG_LOCAL: default: int connectedCount = ClusterFlowRuleManager.getConnectedCount(rule.getClusterConfig().getFlowId()); return count * connectedCount; }}Copy the code

Step 3: If the number of remaining tokens is greater than or equal to 0, the token is successfully obtained. The number of successful requests is counted and OK is returned.

In the fourth step, if the number of remaining tokens is less than 0, the token is not obtained successfully. The processing method is similar to that of local flow control.

If prioritized=true, try counting the request to the next window, give the client a SHOULD_WAIT state and return the wait time. By default, the number of clusterflowevent.Waiting must not exceed the threshold.

If prioritized=false or prioritized=true but tries to wait and fails, return to the client BLOCK state.

Processing TokenResult

TokenResult returned by TokenServer or itself (which is itself an embedded TokenServer) is treated differently depending on the status code.

  • OK: returns true;
  • SHOULD_WAIT: delegate prioritized=true Get Token successfully, but sleep until the next window, return true;
  • NO_RULE_EXISTS, BAD_REQUEST, FAIL, TOO_MANY_REQUEST: In these cases, cluster flow control rules can be degraded to local flow control rules (fallbackToLocalWhenFail=true).
  • BLOCKED: The current resource is BLOCKED by the cluster. The return value is false.
// ClusterFlowChecker.java private static boolean applyTokenResult(TokenResult result, FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { switch (result.getStatus()) { case TokenResultStatus.OK: Return true; case TokenResultStatus.SHOULD_WAIT: // Wait for next tick. Try {thread.sleep (result.getwaitinms ()); } catch (InterruptedException e) { e.printStackTrace(); } return true; Case tokenResultStatus. NO_RULE_EXISTS: // The cluster flow control rule does not exist case TokenResultStatus.BAD_REQUEST: // Invalid request parameter case TokenResultStatus.FAIL: // The cluster traffic indicator does not exist case TokenResultStatus.TOO_MANY_REQUEST: Return fallbackToLocalOrPass(rule, context, node, acquitolocalorPass, prioritized); Flow control case TokenResultStatus. BLOCKED: / / cluster effect default: return false. }}Copy the code

5. Network communication

All the cluster flow control logic has been completed above, very simple.

Let’s look at the underlying network communication between TokenClient and TokenServer.

1, TokenServer

ClusterTokenServer is an abstract interface to TokenServer that provides two abstract methods: start and stop.

public interface ClusterTokenServer {
    void start() throws Exception;
    void stop() throws Exception;
}
Copy the code

NettyTransportServer really implements the underlying network communication.

Public class NettyTransportServer implements ClusterTokenServer {private Final int Port; // Netty acceptor eventloop private NioEventLoopGroup bossGroup; // Netty IO + service thread pool private NioEventLoopGroup workerGroup; Private Final ConnectionPool ConnectionPool = new ConnectionPool(); Private Final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF); Private Final AtomicInteger failedTimes = new AtomicInteger(0); public NettyTransportServer(int port) { this.port = port; } @Override public void start() { if (! currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) { return; } ServerBootstrap b = new ServerBootstrap(); this.bossGroup = new NioEventLoopGroup(1); This. workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); / / length decoding p.a ddLast (new LengthFieldBasedFrameDecoder (1024, 0, 2, 0, 2)); P.addlast (new NettyRequestDecoder()); P.addlast (new LengthFieldPrepender(2)); P.addlast (new NettyResponseEncoder()); // Business handler p.addlast (new TokenServerHandler(connectionPool)); } }) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) .childOption(ChannelOption.SO_TIMEOUT, 10) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_RCVBUF, 32 * 1024); b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() { @Override public void operationComplete(ChannelFuture future) { if (future.cause() ! Recordlog. info("[NettyTransportServer] Token Server start failed (port=" + port + "), failedTimes: " + failedTimes.get(), future.cause()); currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF); int failCount = failedTimes.incrementAndGet(); If (failCount > MAX_RETRY_TIMES) {// Return at most three times; } try { Thread.sleep(failCount * RETRY_SLEEP_MS); // retry start(); } catch (Throwable e) { RecordLog.info("[NettyTransportServer] Failed to start token server when retrying", e); } } else { currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED); }}}); }Copy the code

Focus on a few points:

  1. Thread model: TokenServer uses Netty as the underlying communication framework, boss thread group has 1 thread responsible for establishing client connection; Worker thread group has cores *2 threads, which are responsible for IO reading and writing and all business processing;
  2. Encoding decoding: TokenServer use ChangBian (LengthFieldBasedFrameDecoder, LengthFieldPrepender) decoding way, solved the problem of unpacking. It also provides its own business codec (NettyRequestDecoder, NettyResponseEncoder).
  3. Retry after startup failure: TokenServer supports a maximum of three retries with an interval of two seconds.

For embedded TokenServer, implementation class is DefaultEmbeddedTokenServer. The underlying delegate SentinelDefaultTokenServer processing logic start and stop. When the state of the current node into TokenServer ClusterStateManager management, will start DefaultEmbeddedTokenServer.

public class DefaultEmbeddedTokenServer implements EmbeddedClusterTokenServer { private final ClusterTokenServer server = new SentinelDefaultTokenServer(true); @Override public void start() throws Exception { server.start(); } @Override public void stop() throws Exception { server.stop(); }}Copy the code

For independent TokenServer, refer to the official ClusterServerDemo, direct use SentinelDefaultTokenServer startup.

Public Class ClusterServerDemo {public static void main(String[] args) throws Exception {// Create TokenServer ClusterTokenServer tokenServer = new SentinelDefaultTokenServer(); / /... tokenServer.start(); }}Copy the code

SentinelDefaultTokenServer underlying or entrust NettyTransportServer realized the start and stop method.

Whether the public class SentinelDefaultTokenServer implements ClusterTokenServer {/ / embedded private final Boolean embedded; // NettyTransportServer private ClusterTokenServer Server; public SentinelDefaultTokenServer() { this(false); } public SentinelDefaultTokenServer(boolean embedded) { this.embedded = embedded; ClusterServerConfigManager.addTransportConfigChangeObserver(new ServerTransportConfigObserver() { @Override public void onTransportConfigChange(ServerTransportConfig config) { changeServerConfig(config); }}); initNewServer(); } private void initNewServer() { if (server ! = null) { return; } int port = ClusterServerConfigManager.getPort(); if (port > 0) { this.server = new NettyTransportServer(port); this.port = port; }}}Copy the code

2, TokenClient

DefaultClusterTokenClient at construction time, created the responsible ClusterTransportClient TokenClient underlying network communications.

public class DefaultClusterTokenClient implements ClusterTokenClient { private ClusterTransportClient transportClient; private TokenServerDescriptor serverDescriptor; private final AtomicBoolean shouldStart = new AtomicBoolean(false); public DefaultClusterTokenClient() { ClusterClientConfigManager.addServerChangeObserver(new ServerChangeObserver() { @Override public void onRemoteServerChange(ClusterClientAssignConfig assignConfig) { changeServer(assignConfig); }}); initNewConnection(); } private void initNewConnection() { if (transportClient ! = null) { return; } String host = ClusterClientConfigManager.getServerHost(); int port = ClusterClientConfigManager.getServerPort(); if (StringUtil.isBlank(host) || port <= 0) { return; } try { this.transportClient = new NettyTransportClient(host, port); this.serverDescriptor = new TokenServerDescriptor(host, port); } catch (Exception ex) { RecordLog.warn("[DefaultClusterTokenClient] Failed to initialize new token client", ex); }}}Copy the code

When the state of the current node into TokenClient ClusterStateManager management, will start DefaultClusterTokenClient.

// ClusterStateManager.java public static boolean setToClient() { if (mode == CLUSTER_CLIENT) { return true; } mode = CLUSTER_CLIENT; sleepIfNeeded(); lastModified = TimeUtil.currentTimeMillis(); return startClient(); } private static Boolean startClient() {try {// If the current node is TokenServer, First stop EmbeddedClusterTokenServer server = EmbeddedClusterTokenServerProvider. GetServer (); if (server ! = null) { server.stop(); } / / start TokenClient ClusterTokenClient TokenClient = TokenClientProvider. GetClient (); if (tokenClient ! = null) { tokenClient.start(); return true; } else { return false; } } catch (Exception ex) { return false; } } // DefaultClusterTokenClient.java public void start() throws Exception { if (shouldStart.compareAndSet(false, true)) { startClientIfScheduled(); } } private void startClientIfScheduled() throws Exception { if (shouldStart.get()) { if (transportClient ! = null) { transportClient.start(); } else { // ... }}}Copy the code

Launched NettyTransportClient DefaultClusterTokenClient bottom.

NettyTransportClient creates Bootstrap with initClientBootstrap, and connect establishes a connection with TokenServer. TokenClient uses a single thread to handle IO reads and writes and business processing.

// NettyTransportClient.java private Bootstrap initClientBootstrap() { Bootstrap b = new Bootstrap(); // 1 thread eventLoopGroup = new NioEventLoopGroup(); b.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void  initChannel(SocketChannel ch) throws Exception { clientHandler = new TokenClientHandler(currentState, disconnectCallback); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); pipeline.addLast(new NettyResponseDecoder()); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new NettyRequestEncoder()); pipeline.addLast(clientHandler); }}); return b; } private void connect(Bootstrap b) { if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) { b.connect(host, port) .addListener(new GenericFutureListener<ChannelFuture>() { @Override public void operationComplete(ChannelFuture future) { if (future.cause() ! = null) { failConnectedTime.incrementAndGet(); channel = null; } else { failConnectedTime.set(0); channel = future.channel(); }}}); }}Copy the code

conclusion

The purpose of cluster flow control is:

  1. The unified node collects statistics on cluster traffic and distributes tokens to cluster nodes based on cluster conditions.
  2. To solve the problem that the flow distribution between different nodes in the same application is not uniform, resulting in poor local flow control effect;

There are two roles in cluster flow control:

  1. Token Client: a cluster flow control Client that communicates with the owning Token Server to request a Token. The cluster traffic limiting server returns the result to the client to determine whether to limit traffic.
  2. Token Server: the cluster flow control Server processes the requests from the Token Client and determines whether to issue the Token (whether to allow the Token) according to the configured cluster rules.

TokenServer has two startup modes:

  1. Embedded TokenServer: TokenServer is in the same process as the business application. Each node in the business cluster is peer, and each node may become TokenServer or TokenClient, depending on external data sources (such as Nacos).
  2. Standalone TokenServer: a separate process is started, independent of the business application;

Cluster flow control configuration:

  1. The cluster flow control configuration exists within FlowRule and takes effect when clusterMode is set to True.

  2. ClusterFlowConfig Important attributes in the cluster flow control configuration include:

    1. FlowId: indicates the unique ID of the cluster flow control rule.
    2. ThresholdType: cluster threshold mode: 0- Single-machine evenly distributed 1- Overall threshold. Single-node equalization When verifying flow control rules in a cluster, the threshold of FlowRule is multiplied by the number of nodes in the cluster as the final verification threshold. The overall threshold is the threshold in FlowRule.
    3. FallbackToLocalWhenFail: If the status code of TokenResult returned by TokenServer is NO_RULE_EXISTS, BAD_REQUEST, FAIL, or TOO_MANY_REQUEST, whether to degrade the local flow control rule.

FlowSlot Verifies flow control rules for a cluster in three steps:

  1. Get TokenService: if the current node is embedded TokenServer, direct access to local EmbeddedClusterTokenServer, don’t have to walk a remote call; If the current node is TokenClient, to obtain the ClusterTokenClient instance, you need to remotely call TokenServer to obtain the Token.

  2. Obtaining a Token: The requestToken method of DefaultTokenService is the core method for obtaining a Token.

  3. TokenResult processing: Depending on the status code in TokenResult, the client performs flow control.

    • OK: returns true;
    • SHOULD_WAIT: delegate prioritized=true Get Token successfully, but sleep until the next window, return true;
    • NO_RULE_EXISTS, BAD_REQUEST, FAIL, TOO_MANY_REQUEST: In these cases, cluster flow control rules can be degraded to local flow control rules (fallbackToLocalWhenFail=true).
    • BLOCKED: The current resource is BLOCKED by the cluster. The return value is false.

Network communication:

TokenServer and TokenClient use Netty as the underlying network communication framework, and the client and server maintain a long connection.

In the thread model, TokenServer starts 1 boss thread to establish connection with the client, and 2* core worker threads are responsible for IO reading and writing and business processing. TokenClient starts 1 thread to process all business.

In terms of encoding and decoding, fixed length codec is used to solve the problem of sticky and unpacking.