Business background

In microservice architecture, services inevitably depend on each other, and generally communicate with each other through REST Api. Here is a diagram stolen to illustrate a specific business scenario:

For example, in the micro-service structure of a mall system, the three services of order, goods and inventory are closely dependent. Under ideal circumstances, it would be best if no problems occurred. However, problems can occur during service operation, such as network congestion, high latency (such as high Full GC times due to memory leaks), and even direct service failure (such as service suspension due to a surge in traffic). If the inventory service fails, it will have a great impact on all services that depend on the inventory service, and eventually even spread to the entire microservice system, which is called the avalanche effect.

Therefore, when a service fails, we need to isolate the failure of the service in a timely manner and prevent it from spreading to the entire microservice system. Because, in order to build a stable and reliable micro-service system, we need to add self-protection to the system, automatic isolation of failure. Hystrix can do just that

What is the Hystrix

Hystrix is a distributed fault-tolerant framework developed by Netflix, which also has excellent distributed open source projects such as Eureka and Zuul. SpringCloud also provides support for Netflix’s department projects, becoming some sub-projects under SpringCloud.

Hystrix features:

  • Prevent the chain reaction of failure and realize the fusing
  • Fail fast and downgrade gracefully
  • Provides real-time monitoring and alarms

Hystrix simple implementation

public class QueryUserIdCommand extends HystrixCommand<Integer> {
    private final static Logger logger = LoggerFactory.getLogger(QueryUserIdCommand.class);
    private UserServiceProvider userServiceProvider;

    public QueryUserAgeCommand(UserServiceProvider userServiceProvider) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("userService"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("queryByUserId"))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        .withCircuitBreakerRequestVolumeThreshold(10)// The fuse will not start to calculate the error rate until there are at least 10 requests
                        .withCircuitBreakerSleepWindowInMilliseconds(5000)// The fuse interrupts the request and enters a semi-open state after 5 seconds, releasing part of the request to retry
                        .withCircuitBreakerErrorThresholdPercentage(50)// Turn on the fuse protection when the error rate reaches 50%
                        .withExecutionTimeoutEnabled(true))
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties
                        .Setter().withCoreSize(10)));
        this.userServiceProvider = userServiceProvider;
    }
     
    @Override
    protected Integer run(a) {
        return userServiceProvider.queryByUserId();
    }
     
    @Override
    protected Integer getFallback(a) {
        return -1; }}Copy the code

The initiating

Integer res = new QueryUserIdCommand(userServiceProvider).execute();
log.info("result:{}", res);
Copy the code

Accessing the interface normally returns the correct information, but when you change the interface of the service on which UserServiceProvider depends to throw an exception directly, you will find that -1 is always returned. This allows you to isolate errors.

Hystrix fault-tolerant

Let’s talk about fault tolerance features provided by Hystrix from three perspectives: resource isolation, circuit breaker and degradation

Resource isolation

As we discussed earlier, in microservices, services are called through REST apis to establish dependencies. If the service call is executed in the same thread as the business code, if the API is called and the network is blocked, it will block not only the business code, but also subsequent requests, because the thread pool is rated for threads. So, Hystrix also provides mechanisms for resource isolation, mainly thread isolation and semaphore isolation

Resource isolation – Thread pools

When we briefly applied Hystrix, it was obvious that we would need to implement our own HystrixCommand and encapsulate the operations of the service invocation in this class. In fact, thread-level resource isolation is implemented in HystrixCommand. Hystrix assigns a separate thread pool to each Command so that a single service call can be made in a separate thread pool without affecting other thread pools.

Hystrix maintains these thread pools through a ConcurrentHashMap:

final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

// Other code

if(! threadPools.containsKey(key)) { threadPools.put(key,new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
Copy the code

Advantages of thread isolation:

  • Protecting the current application from failures of other services ultimately improves the overall stability of the microservice system

  • It is possible to set thread invocation parameters in one Command without affecting other commands. Using Spring Cloud Hystrix, this is:

@HystrixCommand(groupKey="UserGroup", commandKey = "GetUserByIdCommand", commandProperties = {@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "100"),// Timeout in milliseconds. Timeout to fallback
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),// Determine the minimum number of fuses, default is 10; The success rate is calculated only when the number of requests reaches this value within a certain period of time
                    @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"),// The default value of fuse threshold is 50, indicating that 50% of requests fail to be processed within a certain period of time
                },
                threadPoolProperties = {
                        @HystrixProperty(name = "coreSize", value = "30"),
                        @HystrixProperty(name = "maxQueueSize", value = "101"),
                        @HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),
                        @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),
                        @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),
                        @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")})Copy the code
  • Adding requests to a saturated thread pool will directly trigger fallbacks, limiting the use of streams

The disadvantage of configuring a thread pool for each Command is the overhead of scheduling, context switching, and so on. But usually, the introduction of the overhead of thread pool is very small, relative to the benefits of it, most of the time is very happy to conduct such exchanges, but if it is some time consuming very low request, such as directly from the cache to get the data and then return, the effects of the introduction of the thread pool may appear relatively large, this time you can use another method, is the signal isolation

Resource isolation – Semaphore isolation

Semaphore isolation is not resource isolation per se, but rather a flow limiting method to prevent massive thread blocking. Semaphare is similar to the Semaphare class in JUC, except that threads without conditions will call fallback instead of blocking. The enablement method is also simple, and can be declared when implementing Hystrix:

public class QueryUserIdCommand extends HystrixCommand<Integer> {
    private final static Logger logger = LoggerFactory.getLogger(QueryUserIdCommand.class);
    private UserServiceProvider userServiceProvider;
	public QueryUserAgeCommand(UserServiceProvider userServiceProvider) {
    	super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("userService"))
            	.andCommandKey(HystrixCommandKey.Factory.asKey("queryByUserId"))
	            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
    	                .withCircuitBreakerRequestVolumeThreshold(10)// The fuse will not start to calculate the error rate until there are at least 10 requests
        	            .withCircuitBreakerSleepWindowInMilliseconds(5000)// The fuse interrupts the request and enters a semi-open state after 5 seconds, releasing part of the request to retry
            	        .withCircuitBreakerErrorThresholdPercentage(50)// Turn on the fuse protection when the error rate reaches 50%
                	    .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //here!!!!! 
                    	.withExecutionTimeoutEnabled(true))
	            .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties
    	                .Setter().withCoreSize(10)));
    	this.userServiceProvider = userServiceProvider;
	}
 
	@Override
	protected Integer run(a) {
    	return userServiceProvider.queryByUserId();
	}
 
	@Override
	protected Integer getFallback(a) {
    	return -1; }}Copy the code

In summary, resource isolation is basically one of these two types. In most cases, thread isolation is used more. After all, it is resource isolation in the real sense, and semaphore isolation is only used to limit the flow

fusing

When a service fails, preventing the failure from spreading to all dependent services is called a circuit breaker. We have already written code to implement fusing simply in the beginning, so we will introduce some important fusion-related parameters:

  • CircuitBreaker. Enabled: Indicates whether to enable fuses. The default value is true

  • CircuitBreaker. ForceOpen: forces the fuse to open. Default: false.

  • CircuitBreaker. ForceClosed: forced off fuse, false by default.

  • CircuitBreaker. ErrorThresholdPercentage: error rate, 50% by default. If the service invocation times out or the failure rate exceeds 50% within a period of time, turn on the fuse

  • CircuitBreaker. RequestVolumeThreshold: 20 by default. This means that there must be 20 or more requests over a period of time to calculate the error rate. For example, if only 19 requests come in and all of them fail, it’s not 100% error

  • CircuitBreaker. SleepWindowInMilliseconds: ajar sounding sleep time, the default value is 5000 ms. That is, after the fuse is turned on for 5 seconds, it starts to be half-open, and sends out a little request to invoke the service to see if it can succeed

Let’s write code to test a few of these parameters:

@HystrixCommand(groupKey = "productStockOpLog", commandKey = "addProductStockOpLog", fallbackMethod = "addProductStockOpLogFallback", commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", Value = "400"),// Specify how long the timeout is in milliseconds. Timeout into the fallback @ HystrixProperty (name = "circuitBreaker. RequestVolumeThreshold", value = "10"), / / judge fusing the minimum number of requests, the default is 10. Only the number of requests processed within a statistics window reaches this threshold, Will fuse or not judgment @ HystrixProperty (name = "circuitBreaker. ErrorThresholdPercentage", value = "10"), / / judge fusing threshold, the default value is 50, Indicates that 50% of requests failed in a statistics window, triggering a fuse})Copy the code

I first set the timeout to 400ms. Then change the service provider interface method to this:

@Service
public class ServiceProviderImpl implements ServiceProvider {
    private int c = 0;
 
    @Override
    public Integer service(a) {
  
        if (c < 10) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }
        returnc; }}Copy the code

The implementation is also simple, the first ten calls sleep 500ms, that is, the first ten calls must be timed out.

On the client side, the implementation is as follows:

@Test
public void test(a) throws InterruptedException {
    for (int i = 1; i < 15; i++) {
        HystrixCommand<Integer> command = new QueryByUserIdCommand(userServiceProvider);
        Integer r = command.execute();
        String res = r == -1 ? "fallback" : "success";
        System.out.println("The first"i+"This time, the result is"+ method);
    }
    // Wait 5.5s for the fuse to enter the half-open state
    Thread.sleep(5500);
    for (int i = 15; i < 20; i++) {
        HystrixCommand<Integer> command = new UserByOrderIdCommand(userServiceProvider);
        Integer r = command.execute();
        String method = r == -1 ? "fallback" : "success";
        System.out.println("The first"i+"This time, the result is"+ method); }}Copy the code

View the execution result:

The first call, the result is fallback 2, the result is fallback 3, the result is fallback 4, the result is fallback 5, the result is fallback 6, the result is fallback 7, The result is the 8th call to fallback, the result is the 9th call to fallback, the result is the 10th call to fallback, the result is the 11th call to fallback, the result is the 12th call to fallback, the result is the 13th call to fallback, The result is the 14th call to fallback, the result is the 15th call to fallback, the result is success the 16th call, the result is success the 17th call, the result is success the 18th call, the result is success the 19th call, Result is SUCCESS the 20th call results in SUCCESSCopy the code

According to the analysis, the reason for triggering fallbacks 1-10 times is timeout, while for fallbacks 11-14 times, the fuse is triggered when the number of requests reaches 10 times and the failure rate exceeds 50%, so they fail directly and quickly. After sleeping for 5.5 seconds, the fuse enters the semi-open state. At this time, the fuse releases the 15th request to invoke the service. It is found to be successful.

demotion

In fact, the principle of circuit breakers and downgrades is similar, both are quick failure measures after a failed service invocation. But their starting point is not the same, the fuse is to prevent abnormal non – diffusion, to ensure the stability of the system

Downgrades are artificial. In peak traffic times, in order to ensure the normal operation of some popular interfaces, some non-core interfaces are sacrificed and all resources are transferred to hot interfaces. This is service degradation. Every year, when 12306 grabs tickets, people rush to buy tickets for those popular trains, and if other users search for non-popular tickets a few days later, they may not be able to find them. This is the manifestation of degradation, during which non-participating interfaces stop serving, relinquishing resources to participating interfaces.

Therefore, the operation of degradation is actually very simple, like the previous circuit breaker, write the recovery logic of call failure, and then directly stop running other services, so that these interfaces can not be called normally, but not directly report an error, but the service level is reduced.

@FeignClient(value = "microservicecloud-test", fallbackFactory = UserServiceFallbackFactory.class)
public interface UserService
{
    public boolean add(User user);

    public User queryUserByUserId(Long id); 
}
Copy the code

Here is a Service interface, and you can see that it is configured with a fallbackFactory in the annotations


@Component
public class UserServiceFallbackFactory implements FallbackFactory<UserService>
{
    @Override
    public UserService create(Throwable throwable)
    {
        return new UserService() {
            @Override
            public boolean add(User user)
            {
                return false;
            }

            @Override
            public User queryUserByUserId(Long id)
            {
                User user = new User();
                returnuser; }}; }}Copy the code

This roughly completes a downgrade configuration. (It can also be used as a circuit breaker, right? No problem at all)

Hystrix performs several of the invoked methods

There are four ways to call Hystrix:

execute

Remember from the beginning, I wrote a simple demo using Hystrix, which called the following code:

Integer res = new QueryUserIdCommand(userServiceProvider).execute();
log.info("result:{}", res);
Copy the code

The execute method is called, which calls the run method synchronously.

So the question is, what if I run it asynchronously? Do I have to package the Callable myself? Ha, actually Hystrix already thought about this, which is using the queue method

queue

This call is pretty simple, I’ll just post the code

Future<Integer> future = new QueryUserIdCommand(userServiceProvider).queue();
return future.get();
Copy the code

The future.get () method is still blocked, but the service call is not

observe

Next comes a new requirement. I need to send multiple requests and do something with each result. This is not very convenient to use either queue or execute. In this case we can choose to use Observe. If you use observe, then instead of using HystrixCommand, you use HystrixObservableCommand

public class UserServiceObserveCommand extends HystrixObservableCommand<String>{

    private RestTemplate restTemplate;

    protected HelloServiceObserveCommand(String commandGroupKey, RestTemplate restTemplate) {
        super(HystrixCommandGroupKey.Factory.asKey(commandGroupKey));
        this.restTemplate = restTemplate;
    }

    @Override
    protected Observable<String> construct(a) {&emsp; &emsp; &emsp; &emsp; &emsp; &emsp;return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if(! subscriber.isUnsubscribed()){ System.out.println("Service call started!");
                        
                        String result = restTemplate.getForEntity("http://your-url1", String.class).getBody(); &emsp;// Trigger the listener
                        subscriber.onNext(result);
                        
                        String result1 = restTemplate.getForEntity("http://your-url2", String.class).getBody(); 
                        // Trigger the listenersubscriber.onNext(result1); subscriber.onCompleted(); }}catch(Exception e) { subscriber.onError(e); }}}); } &emsp; &emsp;/ / demote Fallback
    @Override
    protected Observable<String> resumeWithFallback(a) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if(! subscriber.isUnsubscribed()) { subscriber.onNext("fallback"); subscriber.onCompleted(); }}catch(Exception e) { subscriber.onError(e); }}}); }}Copy the code

As you can see, each time a method is called, subscriber. OnNext is called to trigger the listener. What about the actual listening implementation?

    @Test
    public void test(a) throws ExecutionException, InterruptedException {
        UserServiceObserveCommand command = new HelloServiceObserveCommand("user",restTemplate);
        / / observe callsObservable<String> observable = command.observe(); &emsp; &emsp; &emsp; &emsp;// Register listener
        observable.subscribe(new Observer<String>() {&emsp;
        	/ / onCompleted listening in
            @Override
            public void onCompleted(a) {
                System.out.println("Call done!"); } &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;@Override
            public void onError(Throwable t) { t.printStackTrace(); } &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp; &emsp;/ / onNext listening in
            @Override
            public void onNext(String s) {
                System.out.println("Finished calling a service!"); }}); }Copy the code

This code should give you an idea of what Observe does

toObservable

Observe is hot execution and toObservable is cold execution. What’s the difference?

Observe Hot execution. You can perform the call method without having the client register to listen. You listen in

It’s up to you not to listen. Anyway, I’ve already called everything I need to call. If you don’t want to listen, I can’t do it.

ToObservable cold execution, need to wait for the client to register listening, can execute the contents of the call method. If you don’t listen, I won’t execute the call method until you register to listen

These are the main functions and uses of Hystrix as fusible degrade current limiting. Hystrix can also be used for surveillance: Hystrix-Dashborad. We’ll talk about that later