Column series: SpringCloud column series
Series of articles:
SpringCloud source series (1) – registry initialization for Eureka
SpringCloud source code series (2) – Registry Eureka service registration, renewal
SpringCloud source code series (3) – Registry Eureka crawl registry
SpringCloud source code series (4) – Registry Eureka service offline, failure, self-protection mechanism
SpringCloud source series (5) – Registry EurekaServer cluster for Eureka
SpringCloud source Code Series (6) – Summary of the Registry Eureka
SpringCloud source series (7) – load balancing Ribbon RestTemplate
SpringCloud source series (8) – load balancing Ribbon core principles
SpringCloud source series (9) – load balancing Ribbon core components and configuration
SpringCloud source Series (10) – HTTP client component of load balancing Ribbon
SpringCloud Source Series (11) – Retries and summaries of the Load Balancing Ribbon
SpringCloud source Code Series (12) – Basic usage of Service invocation Feign
SpringCloud source Code Series (13) – Service invocation of Feign’s scanning @FeignClient annotation interface
SpringCloud source code series (14) – Service calls to Feign build @FeignClient interface dynamic proxy
SpringCloud source Series (15) – Service calls Feign with the Ribbon for load balancing requests
SpringCloud source code series (16) – Fuse Hystrix basic introduction
In this chapter, we will start with HystrixCommand construction and execute() execution, and take a step-by-step look at how Hystrix encapsulates the core principles of business logic, thread pool isolation, and fuse degradation.
It should be noted that The source code of Hystrix makes extensive use of RXJava responsive programming. The source code is filled with a large number of callbacks and Observable layer nesting. The source code operation process is not linear, so I will only show some core source code in the process of source analysis. So we can tease out the Hystrix design.
HystrixCommand components
HystrixCommand is an abstract class that inherits from AbstractCommand. Its core logic is all in AbstractCommand. HystrixCommand is relatively simple, mainly overloading several methods. So let’s first look at the structure of the HystrixCommand component.
HystrixCommand structure
Look at the HystrixCommand class structure diagram, notice the logo on the left, you can get the following information:
- HystrixCommand provides multiple constructors, but they are protected and require subclasses to implement a constructor
run()
Method is an abstract method that needs to be subclassed, meaning that our business logic is wrapped in the run() methodexecute()
,queue()
Is a public method used to execute commandsgetExecutionObservable()
,getFallbackObservable()
GetExecutionObservable () is the parent class that gets the Observable executing the command, GetFallbackObservable () is the parent class that gets the callback method Observable.- Other methods are as follows
getFallback()
,getFallbackMethodName()
They are protected and can be overridden by subclasses.
Gets the execution subscription object
AbstractCommand getExecutionObservable() is an abstract method that implements its parent class AbstractCommand. This method gets the run() method subscription object Observable, as you can see from its name or source code.
@Override
final protected Observable<R> getExecutionObservable(a) {
// defer: The call() method is triggered only when the object is subscribed
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
try {
// Subscribing to this Observable triggers the run() method
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call(a) {
// Save the current thread when subscribingexecutionThread.set(Thread.currentThread()); }}); }Copy the code
Likewise, getFallbackObservable() is the subscription object Observable that gets the callback method.
@Override
final protected Observable<R> getFallbackObservable(a) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
try {
// Returns a subscription object that performs the callback method
return Observable.just(getFallback());
} catch (Throwable ex) {
returnObservable.error(ex); }}}); }Copy the code
Command Execution entry
HystrixCommand has four methods that can be called to execute commands: Execute (), queue(), observe(), toObservable(), and toObservable() are all implemented by calling toObservable(). Observe () and toObservable() both return an Observable. Calling.toblocking () triggers execution of the subscribed object, while toFuture() returns a Future that executes asynchronously. Calling the Future’s get() method can block synchronously and wait for execution results, thus ultimately implementing the different features of the four methods.
public R execute(a) {
return queue().get();
}
public Future<R> queue(a) {
// All calls end up in toObservable()
final Future<R> delegate = toObservable().toBlocking().toFuture();
// delegate does some handling after the delegate executes the exception
final Future<R> f = new Future<R>() {
/ /...
@Override
public boolean isDone(a) {
return delegate.isDone();
}
@Override
public R get(a) throws InterruptedException, ExecutionException {
return delegate.get();
}
/ /...
};
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
//....}}return f;
}
Copy the code
HystrixCommand initialization
AbstractCommand constructor
HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey HystrixCommandGroupKey The default values are used if the remaining parameters are null.
The initialization process of HystrixCommand is relatively simple. It mainly initializes some command configurations and components.
- CommandGroup: specifies the commandGroup. The parameter is mandatory.
- CommandKey: Specifies the command name. The default is the class name. To configure the command, run the commandKey command.
- Properties: Command property configuration. The default configuration can be viewed
HystrixCommandProperties
This class. - ThreadPoolKey: specifies the name of the thread pool. The default value is the same as the group name
hystrix-{groupKey}-{number}
. - Metrics: Measures HystrixCommandMetrics, which measures the success, failure, and timeout of hystrix Command execution.
- CircuitBreaker: HystrixCircuitBreaker, which determines whether the circuitBreaker is on when running a command.
- ThreadPool: threadPool component HystrixThreadPool. When threadPool is isolated, tasks are thrown into the threadPool to execute, isolating resources.
- EventNotifier: Time notification component HystrixEventNotifier
- HystrixConcurrencyStrategy concurrencyStrategy: concurrent strategy
- ExecutionHook: HystrixCommandExecutionHook
- RequestCache: HystrixRequestCache
- CurrentRequestLog: HystrixRequestLog. The default value is null.
// All parameters are non-mandatory except group parameters
protected AbstractCommandHystrixCommandKey group, // Group name HystrixCommandKey key, // command name HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, . / / Hystrix thread pool HystrixCommandProperties Setter commandPropertiesDefaults, / / configuration HystrixThreadPoolProperties Setter threadPoolPropertiesDefaults, / / the thread pool configuration HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, / / semaphore current limiter HystrixPropertiesStrategy propertiesStrategy, / / Hystrix allocation strategy component HystrixCommandExecutionHook executionHook) { // Hook trace Hystrix command execution
// Command grouping
this.commandGroup = initGroupKey(group);
GetClass ().getSimplename ()
this.commandKey = initCommandKey(key, getClass());
/ / the Command configuration, the default value is commandPropertiesDefaults
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
// poolKeyOverride is used first, otherwise groupKey is used if threadPoolKey is empty
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
// Measure statistics component: HystrixCommandMetrics
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
// Circuit breaker: HystrixCircuitBreaker
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
Hystrix thread pool: HystrixThreadPool
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
//Strategies from plugins
// Time notification: HystrixEventNotifier
this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
/ / concurrent strategy: HystrixConcurrencyStrategy
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
/ / Hook: HystrixCommandExecutionHook
this.executionHook = initExecutionHook(executionHook);
HystrixRequestCache
this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
HystrixRequestLog, null by default
this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
// Semaphore callback override
this.fallbackSemaphoreOverride = fallbackSemaphore;
// Semaphore override
this.executionSemaphoreOverride = executionSemaphore;
}
Copy the code
Component initialization
Take the HystrixCircuitBreaker initialization as an example to see how these components are initialized.
The initialization steps are basically similar, with the default component initialized if null is passed in as an AbstractCommand constructor argument. Each component has an inner class Factory, which provides a getInstance method to get the component. Factory uses a ConcurrentHashMap to cache components corresponding to different commands to avoid repeated creation. GetInstance () obtains components from the local cache first, and creates default components if none exist and places them in the local cache.
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// Get the default circuit breaker from Factory
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
returnfromConstructor; }}else {
// Disable circuit breakers and return implementation classes that do nothing
return newNoOpCircuitBreaker(); }}Copy the code
HystrixCircuitBreaker. Factory:
public static class Factory {
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// Check the local cache to see if a circuit breaker component with the same name has been created
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if(previouslyCached ! =null) {
return previouslyCached;
}
// Create a default that has not yet been created
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
return circuitBreakersByCommand.get(key.name());
} else {
returncbForCommand; }}}Copy the code
Gets the execution subscription object
ToObservable () gets the subscription object
AbstractCommand’s toObservable() method returns the final subscription object for command execution, but its internal encapsulation of the run() method is very complex, so let’s focus on the big and the small. Let’s take a look at the overall flow of toObservable returning the Observable from the subscription, until we find out where the run() method is executed.
The general process is as follows:
-
Action0 => terminateCommandCleanup
-
Define command that cancels the execution of the after back to mobilize a Action0 = > unsubscribeCommandCleanup.
-
Func0 => applyHystrixSemantics is the core semantics that defines the application of hystrix. The implementation of this callback is shown because it encapsulates the run() method.
-
Define a transformation Hook Func1 => wrapWithAllOnNextHooks.
-
Define the callback after Hook completion => fireOnCompletedHook.
-
The last step is to create an Observable subscription. Let’s see what it does:
- Check the status of the command first. Non-not_started raises an exception. Otherwise, change the status of the command to OBSERVABLE_CHAIN_CREATED
- Set the time when the command is executed
- Determines whether request logging is enabled. The currentRequestLog is null by default
- To enable request caching, you need to reset the cached Key returned by the getCacheKey() method
- If request caching is enabled, it is first fetched from the HystrixRequestCache cache component, and if it already exists, the subscription to the cache object is returned directly
- If the cache is not enabled or not in the cache, the subscription object is created
applyHystrixSemantics
The subscription object returned - If caching is enabled, encapsulate the subscription object and cache the data after the request ends. Otherwise, use the Observable you created earlier
- The last step is to return the subscription object created earlier and set the callback action defined
ToObservable () To summarize, there are actually two core places:
- One is the
applyHystrixSemantics
Is where the core business logic is encapsulated; - The other is if the request cache is enabled, the request is fetched from the cache first, the command is not executed, and the result is cached after the request is completed.
public Observable<R> toObservable(a) {
// _cmd => Current command object
final AbstractCommand<R> _cmd = this;
// Some actions after the command is executed
final Action0 terminateCommandCleanup = newAction0() {... };// Command to cancel some actions
final Action0 unsubscribeCommandCleanup = newAction0() {... };// Apply Hystrix's core semantics section, Hystrix execution entry
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// Execute the hystrix request
returnapplyHystrixSemantics(_cmd); }};// Do some conversions to the original command
final Func1<R, R> wrapWithAllOnNextHooks = newFunc1<R, R>() {... };// Execute the successful action
final Action0 fireOnCompletedHook = newAction0() {... };// defer: Does not execute immediately, calling toBlocking() before executing the call() method
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
// Set the state to OBSERVABLE_CHAIN_CREATED. If the initial state is not NOT_STARTED, an exception will be thrown
if(! commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {throw newHystrixRuntimeException(...) ; }// Set the command start time (note that this is set only when you actually subscribe)
commandStartTimestamp = System.currentTimeMillis();
CurrentRequestLog is null by default
if(properties.requestLogEnabled().get() && currentRequestLog ! =null) {
currentRequestLog.addExecutedCommand(_cmd);
}
// Whether request caching is enabled and cacheKey is not empty
final boolean requestCacheEnabled = isRequestCachingEnabled();
To enable request caching, override the getCacheKey() method
final String cacheKey = getCacheKey();
// First fetch from the cache
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if(fromCache ! =null) {
isResponseFromCache = true;
// Return the cached data directly
returnhandleRequestCacheHitAndEmitValues(fromCache, _cmd); }}// Subscribe applyHystrixSemantics returns the subscribe object
Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// With request caching enabled, subscribe to hystrixObservable again and cache the results after execution
if(requestCacheEnabled && cacheKey ! =null) {
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
// The subscription object is placed in the cache
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
// Unsubscribe if the cache already exists and return the contents of the cache
if(fromCache ! =null) {
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else{ afterCache = toCache.toObservable(); }}// No caching is the original hystrixObservable
else {
afterCache = hystrixObservable;
}
// Returns the subscription object
return afterCache
.doOnTerminate(terminateCommandCleanup) // The command is executed
.doOnUnsubscribe(unsubscribeCommandCleanup) // Execute after the command is cancelled
.doOnCompleted(fireOnCompletedHook); // Execute the command}}); }Copy the code
Apply Hystrix circuit breakers or semaphores
ApplyHystrixSemantics (_cmd) is the semantics that encapsulates Hystrix, but we haven’t seen run() yet, so let’s look at what it does.
-
First issue a Hook to tell the command to execute.
-
Then use the breaker to determine whether to allow the request. If the breaker rejects it, for example, the breaker is in the open state, it directly degrades.
-
If the breaker allows the request, get a TryableSemaphore, if the semaphore mode is returned is TryableSemaphoreActual; Thread pool mode returns TryableSemaphoreNoOp, which does nothing but let go.
-
The action Action0 => singleSemaphoreRelease that defines whether or not a semaphore license occurs after the request ends.
-
Action1 => markExceptionThrown defines the action Action1 => markExceptionThrown to notify the exception.
-
After obtaining the semaphore license, failure to obtain the semaphore will enter the semaphore refuse degradation
-
Once the semaphore license is obtained, the time to start execution is set
-
Finally, subscribe again through the executeCommandAndObserve(_cmd) method and set the error callback, end callback, and cancel callback
To summarize, applyHystrixSemantics(_cmd) is all about applying circuit breakers or semaphores to limit the flow of a request.
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// The command is executed
executionHook.onStart(_cmd);
// Whether the breaker allows requests
if (circuitBreaker.allowRequest()) {
// Get Semaphore, Semaphore mode returns TryableSemaphoreActual, thread pool mode returns TryableSemaphoreNoOp
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
// Release the semaphore license when the request ends
final Action0 singleSemaphoreRelease = newAction0() {... };// Notify when an exception is thrown
final Action1<Throwable> markExceptionThrown = newAction1<Throwable>() {... };Thread pool mode always returns true for the semaphore license
if (executionSemaphore.tryAcquire()) {
try {
// Set the start time of execution. This should be the time when the command is actually executed
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// Execute the command and subscribe
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown) // Execute after error
.doOnTerminate(singleSemaphoreRelease) // Execute the command
.doOnUnsubscribe(singleSemaphoreRelease); // Execute after canceling execution
} catch (RuntimeException e) {
returnObservable.error(e); }}else {
// Semaphore rejected => Degraded
returnhandleSemaphoreRejectionViaFallback(); }}else {
// Short circuit => Degraded
returnhandleShortCircuitViaFallback(); }}Copy the code
Subscription object timeout processing
ApplyHystrixSemantics (_cmd) calls executeCommandAndObserve(_cmd) to get the subscription object.
-
First, I also created a few callback objects
-
The core is in the last few steps, invoke executeCommandWithSpecifiedIsolation (_cmd) to get a subscription object, the method name should be used Hystrix isolation strategy.
-
If you enable timeouts, subscribe to the object will also increase a timeout processor HystrixObservableTimeoutOperator, in can be found that the processor creates a TimerListener to change the status to isCommandTimedOut timeout, So this corresponds to this. We’ll talk about timeouts later.
To sum up, the core of executeCommandAndObserve(_cmd) is to add a timeout handler to the subscription object in response to a timeout if it is enabled.
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// Mark that the command has been executed
final Action1<R> markEmits = newAction1<R>() {... };// Indicates that the command execution is complete
final Action0 markOnCompleted = newAction0() {... };// Handle the callback
final Func1<Throwable, Observable<R>> handleFallback = newFunc1<Throwable, Observable<R>>() {... };// Set the current thread
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? superR>>() {... }; Observable<R> execution;Observable that creates the run() method continues encapsulation according to the configured isolation policy. Thread pool isolation mode is put into the thread pool for scheduling, and semaphore mode returns directly
if (properties.executionTimeoutEnabled().get()) {
/ / overtime will be handled by HystrixObservableTimeoutOperator, throw HystrixTimeoutException timeout exception
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// Set the subscription callback
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Copy the code
Execute the command under the configured isolation policy
Then look at executeCommandWithSpecifiedIsolation (_cmd) method, the method name will know it’s the command to configure isolation strategy execution. You can see that the first if… else… Split into thread pool isolation and semaphore isolation.
Thread pool isolation mode:
-
Change the status of the command from OBSERVABLE_CHAIN_CREATED to USER_CODE_EXECUTED to indicate that the user code is executed.
-
Run the isCommandTimedOut command to check whether the command times out. If the command times out, an exception is thrown and the command times out before the command is executed. This can happen if the thread pool is full, the command is waiting in a queue, and times out while waiting. Remember the isCommandTimedOut thing, which is set somewhere else.
-
Hystrix. StartCurrentThreadExecutingCommand (getCommandKey ()) seems to be began to execute the command, but its interior is just put HystrixCommandKey in a stack of stack, The returned endCurrentThreadExecutingCommand is at the end of the command execution Will HystrixCommandKey pop up from the stack. It’s not clear what it does.
-
Through getUserExecutionObservable (_cmd) for the user to perform subscription object, this method is the final packaging the run () method returns the subscription.
-
The biggest difference between thread pool isolation and semaphore isolation is that in the last step, thread pool isolation has a subscribeOn Scheduler, This scheduler is an Rx.Scheduler object obtained by calling ThreadPool.getScheduler (Func0 func). The actual type is HystrixContextScheduler. The Observable returned by call() is thrown into the Scheduler for asynchronous scheduling, so this will be the entry point for thread pool isolation.
Signal isolation is relatively easier, final step by same getUserExecutionObservable (_cmd) to obtain the run () method of subscription object.
To summarize, the core of this method is to return a subscription object to run() and isolate resources according to the command’s isolation policy. Thread pool isolation subscribing to a Scheculer for scheduled execution, and you can assume that the interior will eventually be thrown into a thread pool for execution.
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
Thread pool isolation
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
// The status changed from OBSERVABLE_CHAIN_CREATED to USER_CODE_EXECUTED, indicating that the code for the run() method is executed
if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(newIllegalStateException(...) ); }// The command is executed
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// To check whether the command timed out, isCommandTimedOut is updated in TimerListener
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// Increase the number of execution threads
HystrixCounters.incrementGlobalConcurrentThreads();
// The statistics thread starts executing
threadPool.markThreadExecution();
// Push the command Key to the top of a stack
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
// issue some Hook notifications...
// Get the Observable of Run ()
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnTerminate(new Action0() {
// Update the thread status after the command is executed...
}).doOnUnsubscribe(new Action0() {
// Update thread status after the command is canceled...
})
// Place the Observable returned by defer into a scheduler to execute asynchronously. The scheduler ==> HystrixContextScheduler
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call(a) {
// Determine whether to interrupt thread execution
returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }})); }// Semaphore isolation
else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
// The status changed from OBSERVABLE_CHAIN_CREATED to USER_CODE_EXECUTED, indicating that the code for the run() method is executed
if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(newIllegalStateException(...) ); }// Statistics: the semaphore default command starts to execute
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// Push the command Key to the top of a stack
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
// issue some Hook notifications...
// Get the Observable of Run ()
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
returnObservable.error(ex); }}}); }}Copy the code
Gets the subscription object for business code execution
In the end, finally find enclosed the run () method, getUserExecutionObservable (_cmd) method is simpler, Call HystrixCommand’s getExecutionObservable() to get the subscription object that executes Run (), which is our custom business code.
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
// Encapsulates the overloaded run() method
userObservable = getExecutionObservable();
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
// Hystrix performs callback processing
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
Copy the code
GetExecutionObservable () in HystrixCommand:
final protected Observable<R> getExecutionObservable(a) {
// defer: The call() method is triggered only when the object is subscribed
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
try {
// Subscribing to this Observable triggers the run() method
return Observable.just(run());
} catch (Throwable ex) {
returnObservable.error(ex); }}})}Copy the code
A diagram summarizes the process of getting a subscription object
ToObservable () is the entry point for retrieving the run() business logic subscription object. To add hystrix features, layers are embedded to create the final Observable. Basically, each submethod adds a feature to the subscription object.
toObservable()
: Entry to get a subscription object.applyHystrixSemantics(_cmd)
: Apply a Hystrix circuit breaker or semaphore to reject downgrade when the circuit breaker is on or unable to acquire the semaphore.executeCommandAndObserve(_cmd)
If timeout is enabled, add a timeout handler to the subscribed object,executeCommandWithSpecifiedIsolation
: Returns different subscription objects according to the configured isolation policy. Thread pool isolation throws the subscription object into oneHystrixContextScheduler
To schedule execution.getUserExecutionObservable(_cmd)
: returns the subscription object that truly encapsulates the run() business logic.