preface
As the business becomes more and more complex, it is more important to ensure the robustness of the program. After all, a program that does not write bugs is not a good program. But how to ensure the stable operation of our program as far as possible, and the corresponding compensation can be made after the error, here we need to use the circuit breaker mechanism.
PS: Before entering the main text, we might as well think about two questions: ① What problem does the circuit breaker mechanism solve for us? ② How can we implement a simple circuit breaker by ourselves?
The implementation of custom fuses
Here we have a simple implementation of a timeout after the fusing example, here useful knowledge of AspectJ, for those familiar with Spring AOP knowledge should not be a problem.
There are two main steps:
- use
Future
Control whether timeout, after the timeout taskcancel
It off. - Call our own definition
fallback
Method to process. The important thing to note here is,fallback
The method parameters should be the same as the original method, so that we can carry out compensation measures. We can do it in the afternoonfallback
Method stores these parameters with the help of the message-oriented middleware, and then reads them from the message-oriented middleware for compensation consumption at appropriate times.
1@RestController
2public class HelloController {
3 private Random random = new Random();
4
5 @MyHystrixCommand(fallback="errorMethod")
6 @RequestMapping("/hello")
7 public String hello(@RequestParam("name") String message) throws InterruptedException {
8 int time = random.nextInt(200);
9 System.out.println("spend time : " + time + "ms");
10 Thread.sleep(time);
11 System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhh");
12 return "hello world:" + message;
13 }
14
15 public String errorMethod(String message) {
16 return "error message";
17 }
18}
Copy the code
1@Target(ElementType.METHOD)
2@Retention(RetentionPolicy.RUNTIME)
3@Documented
4public @interface MyHystrixCommand {
5 int value(a) default 100;
6 String fallback(a) default "";
7}
Copy the code
1@Aspect
2@Component
3public class MyHystrixCommandAspect {
4
5 ExecutorService executor = Executors.newFixedThreadPool(10);
6
7 @Pointcut(value = "@annotation(MyHystrixCommand)")
8 public void pointCut(a) {
9
10 }
11
12 @Around(value = "pointCut()&&@annotation(hystrixCommand)")
13 public Object doPointCut(ProceedingJoinPoint joinPoint, MyHystrixCommand hystrixCommand) throws Throwable {
14 int timeout = hystrixCommand.value();
15 Future future = executor.submit(() -> {
16 try {
17 return joinPoint.proceed();
18 } catch (Throwable throwable) {
19 }
20 return null;
21 });
22 Object returnValue = null;
23 try {
24 returnValue = future.get(timeout, TimeUnit.MILLISECONDS);
25 } catch (InterruptedException | ExecutionException | TimeoutException e) {
26 future.cancel(true);
27 if (StringUtils.isBlank(hystrixCommand.fallback())){
28 throw new Exception("fallback is null");
29 }
30 returnValue = invokeFallbackMethod(joinPoint, hystrixCommand.fallback());
31 }
32 return returnValue;
33 }
34
35 private Object invokeFallbackMethod(ProceedingJoinPoint joinPoint, String fallback) throws Exception {
36 Method method = findFallbackMethod(joinPoint, fallback);
37 if (method == null) {
38 throw new Exception("can not find fallback :" + fallback + " method");
39 } else {
40 method.setAccessible(true);
41 try {
42 Object invoke = method.invoke(joinPoint.getTarget(), joinPoint.getArgs());
43 return invoke;
44 } catch (IllegalAccessException | InvocationTargetException e) {
45 throw e;
46 }
47 }
48 }
49
50
51 private Method findFallbackMethod(ProceedingJoinPoint joinPoint, String fallbackMethodName) {
52 Signature signature = joinPoint.getSignature();
53 MethodSignature methodSignature = (MethodSignature) signature;
54 Method method = methodSignature.getMethod();
55Class<? >[] parameterTypes = method.getParameterTypes();
56 Method fallbackMethod = null;
57 try {
58 // The fallback method must take the same parameters as the original method
59 fallbackMethod = joinPoint.getTarget().getClass().getMethod(fallbackMethodName, parameterTypes);
60 } catch (NoSuchMethodException e) {
61 }
62 return fallbackMethod;
63 }
64
65}
Copy the code
Of course, the above example is just a simple implementation of post-timeout fusing. In practical application, we may also need to degrade after the concurrency exceeds the specified threshold. One of the most common scenarios is the second kill case. All of these things are addressed in Hystrix, which provides both thread pools and semaphores to address concurrency.
What is Hystrix?
Let’s take a look at the official introduction
In a distributed environment, inevitably some of the many service dependencies will fail. Hystrix is a library that helps you control the interactions between these distributed services by adding latency tolerance and fault tolerance logic. Hystrix does this by isolating points of access between the services, stopping cascading failures across them, And fallback options, all of which improve your system’s overall resiliency.
In a distributed environment, it is inevitable that some services will fail to be invoked. Hystrix helps us to add tolerance policies and isolate services to prevent the failure of one service from affecting the invocation of another, which increases the resiliency of our system.
Hystrix process
HystrixCommandAspect Spring Cloud Hystrix
1@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
2 public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
3 Method method = getMethodFromTarget(joinPoint);
4.
5 MetaHolder metaHolder = metaHolderFactory.create(joinPoint);/ / the first step
6 HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);/ / the second step
7.
8 Object result;
9 try {
10 / / the third step
11 if(! metaHolder.isObservable()) {
12 result = CommandExecutor.execute(invokable, executionType, metaHolder);
13 } else {
14 result = executeObservable(invokable, executionType, metaHolder);
15 }
16 }
17.
18 return result;
19 }
Copy the code
This aspect focuses on HystrixCommand, which is used for fuse downgrading, and HystrixCollapser, which is used for merge requests based on configuration (similar to database operations, which combine multiple INSERT statements into a single INSERT Batch statement). Let’s focus on the analysis of HystrixCommand.
Step 1: Get metadata (MetaHolder)
MetaHolder = metaHolderFactory.create(joinPoint); metaHolderFactory.create(joinPoint); , which encapsulates such as call method, parameter ARgs, method object target, dynamic proxy object proxy, callback method fallbackMethod and so on some metadata encapsulation. This data is used when creating the command object.
Step 2: Get the caller (HystrixInvokable)
It holds a command object, and can complete specific business logic through this command object when appropriate. For HystrixCommand, the command object is GenericObservableCommand and GenericCommand. The choice of command object depends on the return value of the method. If the return value is Observable, GenericObservableCommand is created; otherwise, GenericObservableCommand is created.
Step 3: Execute
1 public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
2.
3 switch (executionType) {
4 case SYNCHRONOUS: {
5 return castToExecutable(invokable, executionType).execute();
6 }
7 case ASYNCHRONOUS: {
8 HystrixExecutable executable = castToExecutable(invokable, executionType);
9 if (metaHolder.hasFallbackMethodCommand()
10 && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
11 return new FutureDecorator(executable.queue());
12 }
13 return executable.queue(a);
14 }
15 case OBSERVABLE: {
16 HystrixObservable observable = castToObservable(invokable);
17 return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
18 }
19.
20 }
21 }
Copy the code
From the above code snippet, it is easy to see that there are three strategies: synchronous, asynchronous, OBSERVABLE, Observables are divided into Cold Observables (Observable.toobServable ()) and Hot Observables (Observable.observe ()). So there are four ways of doing it. But the bottom will be called to AbstractCommand. ToObservable () method.
- Execute (): Executes synchronously, returns a single object result, and throws an exception if an error occurs.
- Queue (): execute asynchronously and return one
Future
Object that contains a single result returned after execution. - Observe (): This method returns one
Observable
Object that represents multiple results of an operation but has been consumed by the subscriber. - ToObservable (): This method returns one
Observable
Object, which represents multiple results of an operation that we need to manually subscribe to and consume ourselves.
In the execution of logic, a large number of RxJava, a variety of callback processing, see really dizzy, interested students can read the source code, I just introduce some key process points here.
ToObservable (toObservable, toObservable, toObservable);
1/* try from cache first */
2 if (requestCacheEnabled) {
3 HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
4 if(fromCache ! =null) {
5 isResponseFromCache = true;
6 return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
7 }
8}
Copy the code
(2) check whether the circuit breaker is open, if the circuit breaker is opened, the fallback processing directly by handleShortCircuitViaFallback:
1private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
2 executionHook.onStart(_cmd);
3
4 /* determine if we're allowed to execute */
5 if (circuitBreaker.allowRequest()) {
6 }else {
7 return handleShortCircuitViaFallback();
8 }
9.
10}
Copy the code
(3) check whether to use the semaphore, if used, then determine whether filled, fill after an exception is thrown, through handleSemaphoreRejectionViaFallback directly to fallback to execute, the logic behind the does not perform. If it doesn’t return a DEFAULT TryableSemaphoreNoOp. DEFAULT, on executionSemaphore. TryAcquire () always returns true.
1if (executionSemaphore.tryAcquire()) {
2 try {
3 /* used to track userThreadExecutionTime */
4 executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
5 return executeCommandAndObserve(_cmd)
6 .doOnError(markExceptionThrown)
7 .doOnTerminate(singleSemaphoreRelease)
8 .doOnUnsubscribe(singleSemaphoreRelease);
9 } catch (RuntimeException e) {
10 return Observable.error(e);
11 }
12} else {
13 return handleSemaphoreRejectionViaFallback();
14}
Copy the code
4 Run the logic in the command
Override AbstractCommand’s getExecutionObservable() method to invoke the corresponding logic in the following two command classes.
- The run() method in GenericCommand
- Construct () method in GenericObservableCommand
If a timeout is set in Run or Construct, a TimeoutException will be thrown if the execution time exceeds the threshold, or any other exception will be thrown during execution, which will be processed in fallback logic.
⑤ Execute fallback after exceptions occur
1 private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd,
2 final HystrixEventType eventType,
3 final FailureType failureType,
4 final String message,
5 final Exception originalException) {
6}
Copy the code
This method will be called eventually, so let’s see what types of FailureType there are.
- COMMAND_EXCEPTION: perform
run
Methods orconstruct
Method throws an exception. - TIMEOUT: indicates a TIMEOUT.
- SHORTCIRCUIT: This parameter is executed when the circuit breaker is directly opened
handleShortCircuitViaFallback
Methods. - REJECTED_THREAD_EXECUTION: the thread pool and request queue are full.
- REJECTED_SEMAPHORE_EXECUTION: Full semaphore.
- BAD_REQUEST_EXCEPTION:
- REJECTED_SEMAPHORE_FALLBACK:
conclusion
Hystrix uses a lot of RxJava, which makes reading the source code a bit confusing. Consider making a few breakpoints at key points, otherwise the callbacks will make you go round and round. However, I think RxJava code looks pretty beautiful, just a little bit uncomfortable, I will study RxJava sometime later.