preface

As the business becomes more and more complex, it is more important to ensure the robustness of the program. After all, a program that does not write bugs is not a good program. But how to ensure the stable operation of our program as far as possible, and the corresponding compensation can be made after the error, here we need to use the circuit breaker mechanism.

PS: Before entering the main text, we might as well think about two questions: ① What problem does the circuit breaker mechanism solve for us? ② How can we implement a simple circuit breaker by ourselves?


The implementation of custom fuses

Here we have a simple implementation of a timeout after the fusing example, here useful knowledge of AspectJ, for those familiar with Spring AOP knowledge should not be a problem.

There are two main steps:

  1. useFutureControl whether timeout, after the timeout taskcancelIt off.
  2. Call our own definitionfallbackMethod to process. The important thing to note here is,fallbackThe method parameters should be the same as the original method, so that we can carry out compensation measures. We can do it in the afternoonfallbackMethod stores these parameters with the help of the message-oriented middleware, and then reads them from the message-oriented middleware for compensation consumption at appropriate times.
 1@RestController

2public class HelloController {

3    private Random random = new Random();

4

5    @MyHystrixCommand(fallback="errorMethod")

6    @RequestMapping("/hello")

7    public String hello(@RequestParam("name") String message) throws InterruptedException {

8        int time = random.nextInt(200);

9        System.out.println("spend time : " + time + "ms");

10        Thread.sleep(time);

11        System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");

12        return "hello world:" + message;

13    }

14

15    public String errorMethod(String message) {

16        return "error message";

17    }

18}

Copy the code
1@Target(ElementType.METHOD)

2@Retention(RetentionPolicy.RUNTIME)

3@Documented

4public @interface MyHystrixCommand {

5    int value(a) default 100;

6    String fallback(a) default "";

7}

Copy the code
 1@Aspect

2@Component

3public class MyHystrixCommandAspect {

4

5    ExecutorService executor = Executors.newFixedThreadPool(10);

6

7    @Pointcut(value = "@annotation(MyHystrixCommand)")

8    public void pointCut(a) {

9

10    }

11

12    @Around(value = "pointCut()&&@annotation(hystrixCommand)")

13    public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {

14        int timeout = hystrixCommand.value();

15        Future future = executor.submit(() -> {

16            try {

17                return joinPoint.proceed();

18            } catch (Throwable throwable) {

19            }

20            return null;

21        });

22        Object returnValue = null;

23        try {

24            returnValue = future.get(timeout, TimeUnit.MILLISECONDS);

25        } catch (InterruptedException | ExecutionException | TimeoutException e) {

26            future.cancel(true);

27            if (StringUtils.isBlank(hystrixCommand.fallback())){

28                throw new Exception("fallback is null");

29            }

30            returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());

31        }

32        return returnValue;

33    }

34

35    private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {

36        Method method = findFallbackMethod(joinPoint, fallback);

37        if (method == null) {

38            throw new Exception("can not find fallback :" + fallback + " method");

39        } else {

40            method.setAccessible(true);

41            try {

42                Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());

43                return invoke;

44            } catch (IllegalAccessException | InvocationTargetException e) {

45                throw e;

46            }

47        }

48    }

49

50

51    private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {

52        Signature signature = joinPoint.getSignature();

53        MethodSignature methodSignature = (MethodSignature) signature;

54        Method method = methodSignature.getMethod();

55Class<? >[] parameterTypes = method.getParameterTypes();

56        Method fallbackMethod = null;

57        try {

58        // The fallback method must take the same parameters as the original method

59            fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);

60        } catch (NoSuchMethodException e) {

61        }

62        return fallbackMethod;

63    }

64

65}

Copy the code

Of course, the above example is just a simple implementation of post-timeout fusing. In practical application, we may also need to degrade after the concurrency exceeds the specified threshold. One of the most common scenarios is the second kill case. All of these things are addressed in Hystrix, which provides both thread pools and semaphores to address concurrency.


What is Hystrix?

Let’s take a look at the official introduction

In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, And fallback options, all of which improve your system’s overall resiliency.

In a distributed environment, it is inevitable that some services will fail to be invoked. Hystrix helps us to add tolerance policies and isolate services to prevent the failure of one service from affecting the invocation of another, which increases the resiliency of our system.


Hystrix process

HystrixCommandAspect Spring Cloud Hystrix

 1@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")

2    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {

3        Method method = getMethodFromTarget(joinPoint);

4.

5        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);/ / the first step

6        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);/ / the second step

7.

8        Object result;

9        try {

10            / / the third step

11            if(! metaHolder.isObservable()) {

12                result = CommandExecutor.execute(invokable, executionType, metaHolder);

13            } else {

14                result = executeObservable(invokable, executionType, metaHolder);

15            }

16        } 

17.

18        return result;

19    }

Copy the code

This aspect focuses on HystrixCommand, which is used for fuse downgrading, and HystrixCollapser, which is used for merge requests based on configuration (similar to database operations, which combine multiple INSERT statements into a single INSERT Batch statement). Let’s focus on the analysis of HystrixCommand.

Step 1: Get metadata (MetaHolder)

MetaHolder = metaHolderFactory.create(joinPoint); metaHolderFactory.create(joinPoint); , which encapsulates such as call method, parameter ARgs, method object target, dynamic proxy object proxy, callback method fallbackMethod and so on some metadata encapsulation. This data is used when creating the command object.

Step 2: Get the caller (HystrixInvokable)

It holds a command object, and can complete specific business logic through this command object when appropriate. For HystrixCommand, the command object is GenericObservableCommand and GenericCommand. The choice of command object depends on the return value of the method. If the return value is Observable, GenericObservableCommand is created; otherwise, GenericObservableCommand is created.

Step 3: Execute
 1    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {

2.

3        switch (executionType) {

4            case SYNCHRONOUS: {

5                return castToExecutable(invokable, executionType).execute();

6            }

7            case ASYNCHRONOUS: {

8                HystrixExecutable executable = castToExecutable(invokable, executionType);

9                if (metaHolder.hasFallbackMethodCommand()

10                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {

11                    return new FutureDecorator(executable.queue());

12                }

13                return executable.queue(a);

14            }

15            case OBSERVABLE: {

16                HystrixObservable observable = castToObservable(invokable);

17                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();

18            }

19.

20        }

21    }

Copy the code

From the above code snippet, it is easy to see that there are three strategies: synchronous, asynchronous, OBSERVABLE, Observables are divided into Cold Observables (Observable.toobServable ()) and Hot Observables (Observable.observe ()). So there are four ways of doing it. But the bottom will be called to AbstractCommand. ToObservable () method.

  • Execute (): Executes synchronously, returns a single object result, and throws an exception if an error occurs.
  • Queue (): execute asynchronously and return oneFutureObject that contains a single result returned after execution.
  • Observe (): This method returns oneObservableObject that represents multiple results of an operation but has been consumed by the subscriber.
  • ToObservable (): This method returns oneObservableObject, which represents multiple results of an operation that we need to manually subscribe to and consume ourselves.

In the execution of logic, a large number of RxJava, a variety of callback processing, see really dizzy, interested students can read the source code, I just introduce some key process points here.

ToObservable (toObservable, toObservable, toObservable);

1/* try from cache first */

2 if (requestCacheEnabled) {

3      HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);

4       if(fromCache ! =null) {

5           isResponseFromCache = true;

6           return handleRequestCacheHitAndEmitValues(fromCache, _cmd);

7        }

8}

Copy the code

(2) check whether the circuit breaker is open, if the circuit breaker is opened, the fallback processing directly by handleShortCircuitViaFallback:

 1private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

2        executionHook.onStart(_cmd);

3

4        /* determine if we're allowed to execute */

5        if (circuitBreaker.allowRequest()) {

6        }else {

7            return handleShortCircuitViaFallback();

8        }

9.

10}

Copy the code

(3) check whether to use the semaphore, if used, then determine whether filled, fill after an exception is thrown, through handleSemaphoreRejectionViaFallback directly to fallback to execute, the logic behind the does not perform. If it doesn’t return a DEFAULT TryableSemaphoreNoOp. DEFAULT, on executionSemaphore. TryAcquire () always returns true.

 1if (executionSemaphore.tryAcquire()) {

2  try {

3    /* used to track userThreadExecutionTime */

4    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

5    return executeCommandAndObserve(_cmd)

6            .doOnError(markExceptionThrown)

7            .doOnTerminate(singleSemaphoreRelease)

8            .doOnUnsubscribe(singleSemaphoreRelease);

9    } catch (RuntimeException e) {

10        return Observable.error(e);

11    }

12else {

13    return handleSemaphoreRejectionViaFallback();

14}

Copy the code

4 Run the logic in the command

Override AbstractCommand’s getExecutionObservable() method to invoke the corresponding logic in the following two command classes.

  • The run() method in GenericCommand
  • Construct () method in GenericObservableCommand

If a timeout is set in Run or Construct, a TimeoutException will be thrown if the execution time exceeds the threshold, or any other exception will be thrown during execution, which will be processed in fallback logic.

⑤ Execute fallback after exceptions occur

1   private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, 

2         final HystrixEventType eventType,

3         final FailureType failureType, 

4         final String message,

5         final Exception originalException)
 
{

6}

Copy the code

This method will be called eventually, so let’s see what types of FailureType there are.

  • COMMAND_EXCEPTION: performrunMethods orconstructMethod throws an exception.
  • TIMEOUT: indicates a TIMEOUT.
  • SHORTCIRCUIT: This parameter is executed when the circuit breaker is directly openedhandleShortCircuitViaFallbackMethods.
  • REJECTED_THREAD_EXECUTION: the thread pool and request queue are full.
  • REJECTED_SEMAPHORE_EXECUTION: Full semaphore.
  • BAD_REQUEST_EXCEPTION:
  • REJECTED_SEMAPHORE_FALLBACK:

conclusion

Hystrix uses a lot of RxJava, which makes reading the source code a bit confusing. Consider making a few breakpoints at key points, otherwise the callbacks will make you go round and round. However, I think RxJava code looks pretty beautiful, just a little bit uncomfortable, I will study RxJava sometime later.


END