Column series: SpringCloud column series
Series of articles:
SpringCloud source series (1) – registry initialization for Eureka
SpringCloud source code series (2) – Registry Eureka service registration, renewal
SpringCloud source code series (3) – Registry Eureka crawl registry
SpringCloud source code series (4) – Registry Eureka service offline, failure, self-protection mechanism
SpringCloud source series (5) – Registry EurekaServer cluster for Eureka
SpringCloud source Code Series (6) – Summary of the Registry Eureka
SpringCloud source series (7) – load balancing Ribbon RestTemplate
SpringCloud source series (8) – load balancing Ribbon core principles
SpringCloud source series (9) – load balancing Ribbon core components and configuration
SpringCloud source Series (10) – HTTP client component of load balancing Ribbon
SpringCloud Source Series (11) – Retries and summaries of the Load Balancing Ribbon
SpringCloud source Code Series (12) – Basic usage of Service invocation Feign
SpringCloud source Code Series (13) – Service invocation of Feign’s scanning @FeignClient annotation interface
SpringCloud source code series (14) – Service calls to Feign build @FeignClient interface dynamic proxy
SpringCloud source Series (15) – Service calls Feign with the Ribbon for load balancing requests
SpringCloud source code series (16) – Fuse Hystrix basic introduction
SpringCloud source series (17) – fuse Hystrix fetch execution subscription object Observable
SpringCloud source code series (18) – Fuse Hystrix implementation core principles
Hystrix integrated with Feign
FeignClient generates proxy objects
@FeignClient interface dynamic proxy has been detailed analysis of @FeignClient interface to generate dynamic proxy and remote call principle, the following article is posted in the principle of the article. Easy to review and understand.
Hystrix is related to HystrixTargeter, which is used to create FeignClient dynamic proxy objects. In this step, if Feign has Hystrix enabled, HystrixTargeter is used to create the dynamic proxy object, otherwise DefaultTargeter is used to create the proxy object. The Hystrix and Feign integration can be analyzed from the HystrixTargeter portal.
Feign integration Hystrix
Feign open Hystrix
In the configuration class FeignAutoConfiguration, you can see the following configuration to configure the concrete implementation class of the Targeter. The introduction of feign. Hystrix. HystrixFeign, Targeter implementation classes for HystrixTargeter, or is the default DefaultTargeter.
Feign. Hystrix. HystrixFeign this class belongs to feign – hystrix depend on package, that is to say, it will open hystrix feign function, need to join first feign – hystrix component package. However, spring-cloud-starter-OpenFeign has already helped us introduce feign-Hystrix dependencies, so we don’t need to introduce them separately.
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter(a) {
return newHystrixTargeter(); }}@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
protected static class DefaultFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter(a) {
return newDefaultTargeter(); }}Copy the code
In FeignClientsConfiguration configuration class, as you can see there are the following configuration decided to Feign. Builder, specific types of Feign. The Builder is Targeter used to construct Feign the constructor of the object. As you can see, feign.Builder is feign.Builder by default. If Hystrix is introduced and feign.hystrix.enabled=true, the actual type of feign. Builder is hystrixfeign. Builder, which will be examined later.
In other words, to enable Hystrix, Feign not only needs to add feign-hystrix dependency, but also needs to set feign.hystrix.enabled=true.
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
public Feign.Builder feignBuilder(Retryer retryer) {
return Feign.builder().retryer(retryer);
}
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder(a) {
returnHystrixFeign.builder(); }}Copy the code
Reflection agent handler HystrixInvocationHandler
If hystrix is enabled, feign. Builder is hystrixfeign. Builder, so it follows the logic after if.
- Got the name of FeignClient, the default is the service name, which will be used as HystrixCommand’s
groupName
. - Then get the callback class or callback factory and build Feign.
@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget
target)
{
if(! (feigninstanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
// HystrixCommand is the name of the service defined by Feign
String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
: factory.getContextId();
SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
if(setterFactory ! =null) {
builder.setterFactory(setterFactory);
}
/ / callback classClass<? > fallback = factory.getFallback();if(fallback ! =void.class) {
return targetWithFallback(name, context, target, builder, fallback);
}
// Call back the factoryClass<? > fallbackFactory = factory.getFallbackFactory();if(fallbackFactory ! =void.class) {
return targetWithFallbackFactory(name, context, target, builder, fallbackFactory);
}
return feign.target(target);
}
Copy the code
Either a callback class or a callback factory leads to build(fallbackFactory).
public <T> T target(Target<T> target, T fallback) {
returnbuild(fallback ! =null ? new FallbackFactory.Default<T>(fallback) : null)
.newInstance(target);
}
public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
return build(fallbackFactory).newInstance(target);
}
Copy the code
Moving on to the build() method of Hystrixfeign.Builder, you can see that the biggest difference is the InvocationHandler created when Hystrix is not enabled. When Hystrix is enabled, the InvocationHandler created by the anonymous InvocationHandlerFactory set is the HystrixInvocationHandler.
At the same time, to deal with component interface annotated interface agreement Contract is set to HystrixDelegatingContract, defaults to SpringMvcContract, SpringMvcContract HystrixDelegatingContract is actually the agent.
Feign build(finalFallbackFactory<? > nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target, Map
dispatch)
,> {
return newHystrixInvocationHandler(target, dispatch, setterFactory, nullableFallbackFactory); }});super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
Copy the code
Is not enabled hystrix, create the default InvocationHandler is ReflectiveFeign FeignInvocationHandler.
public interface InvocationHandlerFactory {
/ /...
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return newReflectiveFeign.FeignInvocationHandler(target, dispatch); }}}Copy the code
HystrixDelegatingContract is actually a decorator, you can see, If FeignClient returns HystrixCommand, Observable, Single, Completable, CompletableFuture, it sets the return type of MethodMetadata to the actual type.
public final class HystrixDelegatingContract implements Contract {
private final Contract delegate;
public HystrixDelegatingContract(Contract delegate) {
this.delegate = delegate;
}
@Override
public List<MethodMetadata> parseAndValidateMetadata(Class
targetType) {
List<MethodMetadata> metadatas = this.delegate.parseAndValidateMetadata(targetType);
for (MethodMetadata metadata : metadatas) {
Type type = metadata.returnType();
if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(HystrixCommand.class)) {
Type actualType = resolveLastTypeParameter(type, HystrixCommand.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Observable.class)) {
Type actualType = resolveLastTypeParameter(type, Observable.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Single.class)) {
Type actualType = resolveLastTypeParameter(type, Single.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Completable.class)) {
metadata.returnType(void.class);
} else if (type instanceofParameterizedType && ((ParameterizedType) type).getRawType().equals(CompletableFuture.class)) { metadata.returnType(resolveLastTypeParameter(type, CompletableFuture.class)); }}returnmetadatas; }}Copy the code
Construct HystrxiCommand and execute it
Looking at the Invoke method of HystrixInvocationHandler, when the FeignClient interface is called, the invoke method enters the proxy object and is called to execute.
Hystrix is a proxy object that encapsulates the reflection call of the original method into HystrixCommand’s run() method, and then invokes different methods based on the return type. The default is to call the execute() method of HystrixCommand.
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
/ /...
/ / HystrixCommand construction
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run(a) throws Exception {
try {
// Reflection calls the original method in the run() method
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw(Error) t; }}// Overrides the method that gets the callback
@Override
protected Object getFallback(a) {
// Perform the callback}};if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
Copy the code
RestTemplate integration Hystrix
Use the @hystrixCommand annotation
Remote calls to RestTemplate also need to take into account fuses, degradations, etc., to avoid cascading failures. How does RestTemplate integrate with Hystrix?
We can encapsulate the remote call of RestTemplate into a method, annotate it with @hystrixCommand, and configure grouopKey, callback method and other parameters. Generally, groupKey can configure the service name of the remote call or the third party that provides the interface.
Hystrix must have added a section to intercept the execution of a method annotated with @HystrixCommand, and then, like Feign, integrated Hystrix to encapsulate the method call into HystrixCommand and execute the command.
@Service
public class ProducerWithHystrixService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand( groupKey = "demo-producer", fallbackMethod = "queryId_fallback" )
public String queryId(a) {
ResponseEntity<String> result = restTemplate.postForEntity("http://demo-producer/v1/uuid".new LinkedMultiValueMap<String, Object>(), String.class);
return result.getBody();
}
public String queryId_fallback(a) {
return "error"; }}Copy the code
Hystrix thread passing
Hystrix has two resource isolation modes: thread pool isolation and semaphore isolation. In the case of thread pool isolation, one issue to consider is that if a business uses ThreadLocal to store thread-local variables and HystrixCommand is executed in child threads. Local variables in ThreadLocal are not passed to child threads, so business logic executing in child threads cannot fetch local variables in ThreadLocal. Wouldn’t this affect the execution of the original business logic? How to deal with it?
The simplest solution is to change the isolation policy and use semaphore isolation mode, but Hystrix defaults to thread pool isolation mode and in real world scenarios uses thread pool isolation, which is not an option.
Second was the official recommended use HystrixConcurrencyStrategy Hystrix, realize wrapCallable method, copy the state of the thread in it.
Copy Hystrix thread context state
In the previous analysis HystrixContextSchedulerWorker scheduling, something is not analysis, see the schedule method code, The worker. The schedule of scheduling Action0 scheduling the Action is, in fact, context HystrixContexSchedulerAction. It is the original Action0 encapsulation, again create HystrixContexSchedulerAction when introduced into HystrixConcurrencyStrategy Action0 object and the original.
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if(threadPool ! =null) {
if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("..."); }}return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
Copy the code
Then see HystrixContexSchedulerAction, in the constructor, the original Action0 is encapsulated into a Callable, HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext = HystrixRequestContext
Importantly, the rewrapped Callable is passed into the wrapCallable method of concurrencyStrategy, so this is also an extensible port that Hystrix gives us. This allows us to inject some custom actions, such as copying the thread state, before the Hystrix request is executed.
public class HystrixContexSchedulerAction implements Action0 {
private final Action0 actual;
private final HystrixRequestContext parentThreadState;
private final Callable<Void> c;
public HystrixContexSchedulerAction(Action0 action) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), action);
}
public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
this.actual = action;
// Main thread status
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call(a) throws Exception {
// State of the child thread
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// The main thread state is set to the child thread
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// Execute the original Action0 in main thread state
actual.call();
return null;
} finally {
// Reset the state of the child threadHystrixRequestContext.setContextOnCurrentThread(existingState); }}}); }@Override
public void call(a) {
try {
c.call();
} catch (Exception e) {
throw new RuntimeException("Failed executing wrapped Action0", e); }}}Copy the code
Custom replication thread status
In Spring Security, the Security context SecurityContext is stored in ThreadLocal. In order for the SecurityContext to be passed to child threads, Spring – the cloud – netflix – the core module is custom SecurityContextConcurrencyStrategy security context concurrency strategy class. Below we will look at how to customize by SecurityContextConcurrencyStrategy HystrixConcurrencyStrategy to achieve the purpose of copying the thread state.
Define Callable to encapsulate raw Callable
The development steps are as follows:
- First, we define an implementation class for Callable. In the constructor, we need to store the original proxy object and the state in the main thread.
- In the implementation of the
call()
Method, first the state in the child thread is temporarily stored, and then the state in the main thread is set to the child thread. This step completes the purpose of copying the state of the main thread. - You can then call the proxy’s Callable.
- After the execution, it is usually in
finally
, resets the state of the child thread to its previous state.
public final class DelegatingSecurityContextCallable<V> implements Callable<V> {
// The Callable to delegate
private final Callable<V> delegate;
// SecurityContext in the main thread
private final SecurityContext delegateSecurityContext;
// SecurityContext in the child thread
private SecurityContext originalSecurityContext;
// The constructor passes in the Callable and SecurityContext in the main thread to broker
public DelegatingSecurityContextCallable(Callable<V> delegate, SecurityContext securityContext) {
this.delegate = delegate;
this.delegateSecurityContext = securityContext;
}
// The constructor passes in the Callable and SecurityContext in the main thread to broker
public DelegatingSecurityContextCallable(Callable<V> delegate) {
// Save the SecurityContext for the main thread
this(delegate, SecurityContextHolder.getContext());
}
@Override
public V call(a) throws Exception {
// Hold the SecurityContext of the child thread
this.originalSecurityContext = SecurityContextHolder.getContext();
try {
// Set the SecurityContext of the main thread to the child thread
SecurityContextHolder.setContext(delegateSecurityContext);
// Call the original Callable
return delegate.call();
} finally {
// Reset to SecurityContext for the atomic threadSecurityContextHolder.setContext(originalSecurityContext); }}}Copy the code
Custom Hystrix concurrency policy class
Then develop a custom Hystrix concurrency strategy class inherits from HystrixConcurrencyStrategy, have the following points:
- An existing concurrent policy class needs to be injected into the constructor, because Hystrix’s method to register a concurrent policy object can only be called once, and without injecting an existing concurrent policy class, it cannot be used with other policy classes. We’ll talk about that later.
- It then overloads all the methods of the parent class and returns information about other concurrency policies, such as request variables, thread pools, blocking queues, etc., when the method is called, without affecting the previous concurrency policy.
- Finally, in
wrapCallable
Method, create a custom DelegatingSecurityContextCallable, the packing is our custom action.
public class SecurityContextConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
public SecurityContextConcurrencyStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
returnexistingConcurrencyStrategy ! =null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
returnexistingConcurrencyStrategy ! =null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty
corePoolSize, HystrixProperty
maximumPoolSize, HystrixProperty
keepAliveTime, TimeUnit unit, BlockingQueue
workQueue)
{
returnexistingConcurrencyStrategy ! =null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
returnexistingConcurrencyStrategy ! =null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties)
: super.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
returnexistingConcurrencyStrategy ! =null
? existingConcurrencyStrategy.wrapCallable(new DelegatingSecurityContextCallable<T>(callable))
: super.wrapCallable(newDelegatingSecurityContextCallable<T>(callable)); }}Copy the code
Register a user-defined concurrency policy
Custom class concurrency strategy development is completed, you need to register to the Hystrix, can look at how to register in HystrixSecurityAutoConfiguration:
- First save the reference to the original Hystrix plug-in, and then reset the Hystrix plug-in (primarily to reset the concurrency policy object).
- Then a custom concurrent policy object is created. To create a concurrent policy object, you need to pass in an existing one.
- Then register the Hystrix plug-in again.
@Configuration
@Conditional(HystrixSecurityCondition.class)
@ConditionalOnClass({Hystrix.class, SecurityContext.class})
public class HystrixSecurityAutoConfiguration {
// Inject an existing concurrency policy class
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init(a) {
// Save the original reference to the Hystrix plug-in
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
/ / reset
HystrixPlugins.reset();
// Register a new concurrency policy
HystrixPlugins.getInstance().registerConcurrencyStrategy(new SecurityContextConcurrencyStrategy(existingConcurrencyStrategy));
// Register other plug-ins
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
static class HystrixSecurityCondition extends AllNestedConditions {
public HystrixSecurityCondition(a) {
super(ConfigurationPhase.REGISTER_BEAN);
}
// Enable conditions
@ConditionalOnProperty(name = "hystrix.shareSecurityContext")
static class ShareSecurityContext {}}}Copy the code
Why reset and then re-register? Looking at these registration methods, you can see that these registration methods can only be called once, otherwise an exception will be thrown. So in order to avoid already registered, you need to reset and then re-register.
public void registerConcurrencyStrategy(HystrixConcurrencyStrategy impl) {
if(! concurrencyStrategy.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered."); }}public void registerEventNotifier(HystrixEventNotifier impl) {
if(! notifier.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered."); }}Copy the code
Hystrix enables SecurityContext replication
Can also be seen in the above code, HystrixSecurityAutoConfiguration effective configuration class is the premise of hystrix. ShareSecurityContext = true, therefore, If you want to in the environment of the spring security + hystrix, can in hystrix get SecurityContext child thread, need to configure the hystrix. ShareSecurityContext = true.
Hystrix summary
Hystrix configuration
Hystrix has a number of configurations, and you can find out which configurations and default configuration values in their Properties configuration class.
Fuse configuration
Fuse configurations and default values can be found in HystrixCommandProperties.
Configure isolation policies
Isolation policy configurations and default values can be found in HystrixCommandProperties.
Hystrix thread pool configuration
Hystrix thread pool configuration and related default values can be found in the HystrixThreadPoolProperties.
Yml configuration
In micro service, typically by yml file to configure, don’t use HystrixCommandProperties Setter () withCircuitBreakerEnabled (true) in this form, So how do you configure Hystrix global defaults and different groups?
The default global configuration uses default as the key:
hystrix:
threadpool:
# default 作为 key
default:
coreSize: 10
maximumSize: 20
maxQueueSize: 10
command:
# default 作为 key
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 5000
Copy the code
Use the name of the command as the key for a specific client:
hystrix:
threadpool:
# default
default:
coreSize: 10
maximumSize: 10
maxQueueSize: - 1
# demo-consumer
demo-consumer:
coreSize: 5
maximumSize: 5
maxQueueSize: 10
command:
## default
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 5000
# demo-producer
demo-producer:
execution:
isolation:
strategy: SEMAPHORE
thread:
timeoutInMilliseconds: 2000
Copy the code
For a method on a particular client, use # to separate the client name from the method name:
hystrix:
threadpool:
# demo-consumer
demo-consumer#sayHello(Long,String,Integer):
coreSize: 5
maximumSize: 5
maxQueueSize: 10
Copy the code
Overall Hystrix schematic diagram
Finally, use a diagram to summarize the previous Hystrix source code analysis principles.