The target

  • What is a Resilience4J
  • Soul’s Resilience4j experience
    • Current limiting
    • fusing
  • Resilience4J plug-in source code interpretation

What is a Resilience4j

  • Resilience4J is a lightweight fault tolerance library recommended by Spring Cloud Gateway

  • Borrowed from Hystrix and design, and use JDK8 this functional programming, namely lambda expression

  • In contrast, Netflix Hystrix has compilation dependence on Archaius, and Resilience4j does not need to reference all dependencies. You can reference relevant modules according to your own needs, and Hystrix will not be updated. Spring provides an alternative to Netflix Hystrix, Resilence4J

  • Resilience4J provides a number of features to enhance the usability of microservices:

    • Circuit breaker CircuitBreaker
    • Current limiting RateLimiter
    • Semaphore based isolation
    • The cache
    • Limited-time Timelimiter
    • Request to restart Retry
  • Officially provided dependency packages

     <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-circuitbreaker</artifactId>
            <version>${resilience.version}</version>
     </dependency>
    
    Copy the code

    Soul’s Resilience4j experience

    • First, enable Resilience4j in the soul-Admin console plug-in administration

    • Add dependencies to the Soul gateway

      <dependency>
          <groupId>org.dromara</groupId>
          <artifactId>soul-spring-boot-starter-plugin-ratelimiter</artifactId>
          <version>${project.version}</version>
      </dependency>
      
      Copy the code
  • Start three services, one soul-admin, one soul-bootstrap, and one soul-examples HTTP

  • Find the Resilience4j custom configuration for the plug-in list in the Soul-Admin console, as shown below.

  • Configuration introduction of the Soul official website

    * timeoutDurationRate: timeout time to wait for token acquisition (unit: ms), default value: 5000. * limitRefreshPeriod: Interval between refreshing tokens, in ms. Default value: 500. * limitForPeriod: Number of tokens refreshed each time. Default value: 50. * circuitEnable: indicates whether the circuit is enabled. 0: indicates that the circuit is disabled. 1: indicates that the circuit is enabled. * timeoutDuration: fuse timeoutDuration, expressed in ms. Default value: 30000. * fallbackUri: the URI for the downgrade process. * slidingWindowSize: slidingWindowSize. Default: 100. * slidingWindowType: slidingWindowType, 0: count based, 1: time based, default: 0. * minimumNumberOfCalls: Specifies the minimum number of requests required to enable the fuse count. Default: 100. * waitIntervalFunctionInOpenState: fuse open duration, unit of ms, the default value: 10. * permittedNumberOfCallsInHalfOpenState: circular buffer size under the ajar, must achieve the quantity to calculate failure rate, the default value: 10. * failureRateThreshold: Percentage of error rate. Fuses will start only when this threshold is reached. The default value is 50. * automaticTransitionFromOpenToHalfOpenEnabled: whether automatic transfer from open state to half - open state, and true: is that the false: no, default: false.Copy the code

Current limiting

  • Parameter configuration verification is as follows. If the parameter value is smaller than the default value, the default value will be directly assigned to test the effect, so that the configuration of the source code can be directly modified: The number of refreshing tokens is 2, the interval for refreshing tokens is 1s, and the timeout period is 1s
/** * check filed default value. * * @param resilience4JHandle {@linkplain Resilience4JHandle} * @return {@linkplain Resilience4JHandle} */ public Resilience4JHandle checkData(final Resilience4JHandle resilience4JHandle) { resilience4JHandle.setTimeoutDurationRate(Math.max(resilience4JHandle.getTimeoutDurationRate(), Constants.TIMEOUT_DURATION_RATE)); //resilience4JHandle.setLimitRefreshPeriod(Math.max(resilience4JHandle.getLimitRefreshPeriod(), Constants.LIMIT_REFRESH_PERIOD)); //resilience4JHandle.setLimitForPeriod(Math.max(resilience4JHandle.getLimitForPeriod(), Constants.LIMIT_FOR_PERIOD)); / / refresh every time the number of tokens to 2, refresh token intervals of 1 s resilience4JHandle. SetLimitRefreshPeriod (1000); resilience4JHandle.setLimitForPeriod(2); resilience4JHandle.setTimeoutDuration(1000); resilience4JHandle.setCircuitEnable(Math.max(resilience4JHandle.getCircuitEnable(), Constants.CIRCUIT_ENABLE)); //resilience4JHandle.setTimeoutDuration(Math.max(resilience4JHandle.getTimeoutDuration(), Constants.TIMEOUT_DURATION)); resilience4JHandle.setFallbackUri(!" 0".equals(resilience4JHandle.getFallbackUri()) ? resilience4JHandle.getFallbackUri() : ""); resilience4JHandle.setSlidingWindowSize(Math.max(resilience4JHandle.getSlidingWindowSize(), Constants.SLIDING_WINDOW_SIZE)); resilience4JHandle.setSlidingWindowType(Math.max(resilience4JHandle.getSlidingWindowType(), Constants.SLIDING_WINDOW_TYPE)); resilience4JHandle.setMinimumNumberOfCalls(Math.max(resilience4JHandle.getMinimumNumberOfCalls(), Constants.MINIMUM_NUMBER_OF_CALLS)); resilience4JHandle.setWaitIntervalFunctionInOpenState(Math.max(resilience4JHandle.getWaitIntervalFunctionInOpenState(), Constants.WAIT_INTERVAL_FUNCTION_IN_OPEN_STATE)); resilience4JHandle.setPermittedNumberOfCallsInHalfOpenState(Math.max(resilience4JHandle.getPermittedNumberOfCallsInHalfO penState(), Constants.PERMITTED_NUMBER_OF_CALLS_IN_HALF_OPEN_STATE)); resilience4JHandle.setFailureRateThreshold(Math.max(resilience4JHandle.getFailureRateThreshold(), Constants.FAILURE_RATE_THRESHOLD)); return resilience4JHandle; }Copy the code
  • Use SuperBenchmarker, 4 threads, 10 seconds
C:\Users\v-yanb07>sb -u http://localhost:9195/http/test/findByUserId? userId=1 -c 4 -N 10 Starting at 2021-03-14 15:46:28 [Press C to stop the test] 23 (RPS: 1) ---------------Finished! ---------------- Finished at 2021-03-14 15:46:51 (took 00:00:23.0477097) 24 (RPS: 1) Status 200:25 RPS: 2.2 (Requests /second) Max: 2020ms Min: 202ms Avg: 1677ms 50% below 1994ms 60% below 1997ms 70% below 1999ms 80% below 1999ms 90% below 2001ms 95% below 2019ms 98% below 2020ms 99.9% below 2020msCopy the code
  • The output log

    The 12:16:35 2021-03-14. 379336-252 the INFO [ctor - HTTP - nio - 7] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:36. 249 INFO - 379336 [ctor - HTTP - nio - 4] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:36. 250 INFO - 379336 [ctor - HTTP - nio - 7] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:37. 250 INFO - 379336 [ctor - HTTP - nio - 7] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:37. 250 INFO - 379336 [ctor - HTTP - nio - 4] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:38. 250 INFO - 379336 [ctor - HTTP - nio - 7] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:38. 250 INFO - 379336 [ctor - HTTP - nio - 4] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test the 2021-03-14 12:16:39. 252 INFO - 379336 [ctor - HTTP - nio - 7] O.D.S.E.H.C ontroller. HttpTestController: Current limiting test 12:16:39 2021-03-14. 379336-252 the INFO - nio - 4] [ctor - HTTP O.D.S.E.H.C ontroller. HttpTestController: current limit testCopy the code

    The console logs are output twice per second to verify that traffic limiting takes effect

    fusing

    • We know from the configuration that the fuse is turned off by default and we need to turn it on

    • Soul-examples-http call interfaces add sleep time

      @GetMapping("/findByUserId") public UserDTO findByUserId(@RequestParam("userId") final String userId) throws Exception{ UserDTO userDTO = new UserDTO(); userDTO.setUserId(userId); userDTO.setUserName("hello world"); Log.info (" traffic limiting test "); Int I = RandomUtils. NextInt (1, 3); If (I %2==0){//throw new Exception(" Exception "); Thread.currentThread().sleep(2000); } return userDTO; }Copy the code
  • The Resilience4JHandle#checkData manual setting timeout time to 1s

    resilience4JHandle.setTimeoutDuration(1000);
    
    Copy the code
    • Pos interface call > http://localhost:9195/http/test/findByUserId? userId=1

    If normal data is returned for some requests and the following data is returned for others, the timeout circuit breaker takes effect

    {
    "code": 500,
    "message": "Internal Server Error",
    "data": "404 NOT_FOUND"
    }
    
    Copy the code

Resilience4J plug-in source code interpretation

The soul gateway Resilience4j plug-in source code makes extensive use of reactive programming. First, you need to understand reactive programming

  • The Resilience4J plug-in directory structure

    ├ ─ 2nd, └─ 2nd, ├ ─ 2nd, ├ ─ 2nd, │ ├─ New class │ ├─ New class │ ├─ New class │ ├─ New class │ ├─ New class │ ├─ New class │ ├─ New class │ ├─ New class Combinedexecutor. Java // Flow limiting and fusing actuator │ Executor. Java │ RatelimiterExecutor. Java // Flow limiting actuator │ ├─factory │ Resilience4JRegistryFactory. Java / / current limiting and fusing objects build │ └ ─ handler Resilience4JHandler. JavaCopy the code
  • AbstractSoulPlugin is an important component of This soul plugin. If this plugin is enabled and executed via a chain mechanism, it will lead to the core method doExecute

    @Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext ! = null; Resilience4JHandle = gsonutils.getgson ().fromjson (rule.gethandle (), Resilience4JHandle.class); / / check configuration information, if less than the default, is assigned the default resilience4JHandle = resilience4JHandle. CheckData (resilience4JHandle); / / circuitEnable configuration: 1 open fuse components, otherwise go current-limiting components if (resilience4JHandle. GetCircuitEnable () = = 1) {return combined (exchange, the chain rule); } return rateLimiter(exchange, chain, rule); }Copy the code
    • Current limiting Resilience4JPlugin # rateLimiter
    private Mono<Void> rateLimiter(final ServerWebExchange exchange, final SoulPluginChain chain, Final RuleData rule) {return ratelimiterExecutor.run(// chain.execute(exchange). fallback(ratelimiterExecutor, exchange, null), Resilience4JBuilder.build(rule)) .onErrorResume(throwable -> ratelimiterExecutor.withoutFallback(exchange, Throwable)) / / ratelimiterExecutor. Call the run @ Override public < T > Mono < T > run (final Mono < T > toRun, final Function < throwable, Mono<T>> fallback, Final Resilience4JConf conf) {/ / current limiter component RateLimiter RateLimiter = Resilience4JRegistryFactory. RateLimiter (conf. GetId (), conf.getRateLimiterConfig()); Mono<T> to = torun. transformDeferred(ratelimiteroperator.of (rateLimiter)); if (fallback ! Return to.onErrorResume(fallback); } return to; } // to.onErrorResume(fallback); default Mono<Void> fallback(ServerWebExchange exchange, String uri, Throwable t) { if (StringUtils.isBlank(uri)) { return withoutFallback(exchange, t); } DispatcherHandler dispatcherHandler = SpringBeanUtils.getInstance().getBean(DispatcherHandler.class); ServerHttpRequest request = exchange.getRequest().mutate().uri(Objects.requireNonNull(UriUtils.createUri(uri))).build();  ServerWebExchange mutated = exchange.mutate().request(request).build(); // Return dispatcherHandler.handle(mutated); }Copy the code
  • Fusing Resilience4JPlugin # combined

private Mono<Void> combined(final ServerWebExchange exchange, final SoulPluginChain chain, final RuleData rule) { Resilience4JConf conf = Resilience4JBuilder.build(rule); return combinedExecutor.run( chain.execute(exchange).doOnSuccess(v -> { HttpStatus status = exchange.getResponse().getStatusCode(); if (status == null || ! status.is2xxSuccessful()) { exchange.getResponse().setStatusCode(null); throw new CircuitBreakerStatusCodeException(status == null ? HttpStatus.INTERNAL_SERVER_ERROR : status); } }), fallback(combinedExecutor, exchange, conf.getFallBackUri()), conf); Public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) { RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig()); CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig()); / / the operation of the circuit breaker Mono < T > to = run. TransformDeferred (CircuitBreakerOperator. Of (circuitBreaker)) / / current limit operation . TransformDeferred (Ratelimiteroperator. of(rateLimiter)) // Sets the timeout period Timeout (resilience4JConf. GetTimeLimiterConfig (.) getTimeoutDuration ()) / / if the timeout is thrown timeout exception. DoOnError (TimeoutException. Class, t -> circuitBreaker.onError( resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(), TimeUnit.MILLISECONDS, t)); if (fallback ! = null) { to = to.onErrorResume(fallback); } return to; }Copy the code

conclusion

  • The Soul gateway provides current limiting and fusing, which is turned off by default
  • If the parameter value is smaller than the default value, the default value is used