Rely on the following
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
Copy the code
What is the Hystrix
2018.11 released the last version, currently dealing with the maintenance phase, no upgrade version
-
USES:
-
- Stop the cascading fault. Fallback and elegant downgrades, Fail Fast and quick recoveries
-
- Monitor and configure real-time changes
-
- Resource isolation. Partial unavailability does not cause system unavailability
-
Scenario: On the commodity list interface, data such as red packets, prices, and labels need to be obtained. You can give this guy a thread pool. If the thread pool is full, the non-commodity list interface of the current service is not affected
-
Use: the framework of main use Rxjava hystrix, fit may refer to: www.jianshu.com/p/5e93c9101…
Perform the entrance
Hystrix executes with command as the entry point. AbstractCommand implements the Command Command almost all logic, there are two subclasses were HystrixCommand HystrixCommand, HystrixObservableCommand 99% scene, so the following only explain this Command class, It provides two methods: execute() synchronous execution and queue() asynchronous execution
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
/ /... Omit all constructors
/ /... Omit the static configuration inner class Setter
// The thread used for execution
private final AtomicReference<Thread> executionThread = new AtomicReference<Thread>();
private final AtomicBoolean interruptOnFutureCancel = new AtomicBoolean(false);
Execute () or queue calls the run() method **/
protected abstract R run(a) throws Exception;
/** * a failure to call execute() or queue() degrades the call to getFallback(). Don't realize the current method, will throw an UnsupportedOperationException abnormal * * * / by default
protected R getFallback(a) { throw new UnsupportedOperationException("No fallback available."); }
/** * Whether to customize the failed method, if so, put it in commandContainsFallback map */
@Override
protected boolean isFallbackUserDefined(a) {
Boolean containsFromMap = commandContainsFallback.get(commandKey);
if(containsFromMap ! =null) {
return containsFromMap;
} else {
Boolean toInsertIntoMap;
try {
getClass().getDeclaredMethod("getFallback");
toInsertIntoMap = true;
} catch (NoSuchMethodException nsme) {
toInsertIntoMap = false;
}
commandContainsFallback.put(commandKey, toInsertIntoMap);
returntoInsertIntoMap; }}/** * commandIsScalar=true, In makeEmits will circuitBreaker. MakeSuccess () * - in HystrixObservableCommand to false * /
@Override
protected boolean commandIsScalar(a) { return true; }
/** * The instruction used for synchronous execution */
public R execute(a) {
try {
// Queue is a Future. Get blocks the result, so execute() is a synchronous instruction
// Finally queue() is called
return queue().get();
} catch (Exception e) { throwExceptions.sneakyThrow(decomposeException(e)); }}/** * Instructions for asynchronous execution * commands are queued in the thread pool and return the Future for the result */
public Future<R> queue(a) {
ToObservable () : executes commands asynchronously with callbacks by subscribing to {@Link Observable}.
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
/** Interrupt the running method **/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) { return false; }
if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}
final boolean res = delegate.cancel(interruptOnFutureCancel.get());
if(! isExecutionComplete() && interruptOnFutureCancel.get()) {final Thread t = executionThread.get();
if(t ! =null && !t.equals(Thread.currentThread())) {
// Finally interrupt() is calledt.interrupt(); }}return res;
}
@Override
public boolean isCancelled(a) { return delegate.isCancelled(); }
@Override
public boolean isDone(a) { return delegate.isDone(); }
@Override
public R get(a) throws InterruptedException, ExecutionException { return delegate.get(); }
@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
returndelegate.get(timeout, unit); }};/** Special handling of the immediately thrown error state. But the Future was thrown wrong right after it was created? * * /
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
// Get the exception to be thrown
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throwhre; }}else {
throwExceptions.sneakyThrow(t); }}}returnf; }}Copy the code
ToObservable () method
HystrixCommand has execute() and queue() methods. ToObservable () in queue() is then called. ToObservable ()
- ApplyHystrixSemantics ()
- Determines whether the thread is NOT_STARTED, otherwise HystrixRuntimeException is thrown, and CAS ensures that the current command execution is unique
- Use HystrixRequestLog to log the execution of this command (requestLogEnabled = false to turn off logging)
- When request caching is enabled, the data is fetched from the cache
3.1 requestCacheEnabled = true && getCacheKey()! = null (so do not return null when overwriting the cache method, otherwise it does not take effect) If caching is not enabled or missed, execute the target command to get results. 4.1 Observable.defer() The target method does not execute immediately and requires a subscription to execute asynchronously. ApplyHystrixSemantics () ¶ TerminateCommandCleanup: flag the thread status as TERMINAL 6.1.1 object code is not executed (e.g., the result from the cache) : HystrixCommandMetrics#markCommandDone(), Trigger execution after the completion of the function callback (if endCurrentThreadExecutingCommand not null) 6.1.2 target execution execution. Using markCommandDone (true) tag 6.2 unsubscribeCommandCleanup will write thread state is UNSUBSCRIBED. Trigger executionHook. 6.3 fireOnCompleteHook onUnsubscribe only trigger executionHook. OnSuccess
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
public Observable<R> toObservable(a) {
// Omit the previous Action and Fun
// Create an Observable with observable.defer ()
return Observable.defer(() -> {
// This is a stateful object, so it can only be used once (CAS substitution limits entry to once only)
/* this is a stateful object so can only be used once */
if(! commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex =new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
// Enter the command start time
commandStartTimestamp = System.currentTimeMillis();
// Check whether I request logging is enabled
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if(currentRequestLog ! =null) { currentRequestLog.addExecutedCommand(_cmd); }}// Whether request caching is enabled
final boolean requestCacheEnabled = isRequestCachingEnabled();
// -- cacheKey is null by default, so it is disabled by default
final String cacheKey = getCacheKey();
// If the cache is enabled, it is first fetched from the cache
/* try from cache first */
if (requestCacheEnabled) {
// fetch it from the cache
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if(fromCache ! =null) {
// If not empty. IsResponseFromCache = true and return data
isResponseFromCache = true;
returnhandleRequestCacheHitAndEmitValues(fromCache, _cmd); }}// Wrap as hystrixObservable
// Get the command Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
// Get the cache Observable
Observable<R> afterCache;
// Whether to push into the cache (this is done only if the cacheKey is not empty. The default cacheKey is empty, which requires setting.)
// put in cache
if(requestCacheEnabled && cacheKey ! =null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if(fromCache ! =null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return itafterCache = toCache.toObservable(); }}else {
afterCache = hystrixObservable;
}
return afterCache
// Listen when a subscription is about to be terminated, either normally or abnormally
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
// Listen when unsubscribing
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
// Observable terminates normally when listening.doOnCompleted(fireOnCompletedHook); }); }}Copy the code
ApplyHystrixSemantics () method
The final execution target method calls applyHystrixSemantics() as follows:
- Determine whether the circuit breaker allowed to execute circuitBreaker. AllowRequest (). It is not allowed to execute a fallback directly
- Attempt to obtain semaphore resources. Thread pool isolation mode will use TryableSemaphoreNoOp and return true
- Execute the target method executeCommandAndObserve(). Failure will perform circuit logic, call handleSemaphoreRejectionViaFallback ()
class AbstractCommand {
/ /... Omit the other
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// Mark it up before execution
// There are many executionHook and eventNotifier operations in the source code, which is an extension of Hystrix. Nothing is done in this, leaving a hole for developers to expand
executionHook.onStart(_cmd);
// Determine whether the circuit breaker is allowed to run
/ / - opens the circuit breaker called (withCircuitBreakerEnabled (true)) : HystrixCircuitBreakerImpl
/ / - closed circuit breaker called (withCircuitBreakerEnabled (false)) : NoOpCircuitBreaker. Returns true
if (circuitBreaker.allowRequest()) {
// Get the execution semaphore. If no configuration semaphore mode, return TryableSemaphoreNoOp. DEFAULT
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = () -> {
if (semaphoreHasBeenReleased.compareAndSet(false.true)) {
/ / if it is TryableSemaphoreNoOp. DEFAULT, that is an empty method of the operationexecutionSemaphore.release(); }};final Action1<Throwable> markExceptionThrown = t -> eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
// Determine whether the semaphore is rejected
// Thread pool mode will use TryableSemaphoreNoOp, which returns true
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// key !!!! : Handles quarantine policies and Fallback policies ()
// -- executeCommandAndObserve handles isolation policies and various fallbacks. The target method will be implemented eventually!!
// -- executeCommandAndObserve handles isolation policies and various fallbacks. The target method will be implemented eventually!!
// -- executeCommandAndObserve handles isolation policies and various fallbacks. The target method will be implemented eventually!!
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
returnObservable.error(e); }}else {
returnhandleSemaphoreRejectionViaFallback(); }}else {
// The request is not accepted. The fallback() method is executed
returnhandleShortCircuitViaFallback(); }}}Copy the code
Implement target method
The core method for executing the target method is executeCommandAndObserve()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
/** * Handles isolation policies and various fallbacks. Ultimately, the !!!!!!! of the target method is executed * This decorates "Hystrix" functionality around the run() Observable. */
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
// Execution context. Ensure that the main thread parameters are also available in the thread pool
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1. Record the result result events for: SUCCESS * 2. Closed circuitBreaker circuit breaker (if has been closed to ignore bai) * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
final Action1<R> markEmits = r -> {
// Whether data should be reported in the onNext step
// HystrixCommand -> false
// HystrixObservableCommand -> true
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
if (commandIsScalar()) {
// Whether the command is a scalar
// HystrixCommand -> true
// HystrixObservableCommand -> false
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// This code is important:
// Record the result as SUCCESS
/ / and, and, and circuitBreaker. MarkSuccess (); (If the circuit breaker is open, it is closed.)
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); }};/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * 1. Ensure that the Scala type results can also be normally closed circuit breaker and marked Success * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
final Action0 markOnCompleted = () -> {
if(! commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
// These lines of code are important: markEmits works the same as above. However, commandIsScalar() == false is called in the case of HystrixObservableCommand
// Record the result as SUCCESS
/ / and, and, and circuitBreaker. MarkSuccess (); (If the circuit breaker is open, it is closed.)
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); }};/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * when an exception is thrown during the execution of a target method (program might be the problem, may be a timeout, etc.), will enter here to deal with, handle a case can be divided into two categories: * * 1. Trigger fallback function * 2. Do not trigger the fallback function * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
// Throw Throwable t into Exception e.
// If t is an NPE exception, then t and e are exactly the same.
// t is not equal to e only if t is an error class
Exception e = getExceptionFromThrowable(t);
// Record the exception e during execution
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
// Thread pool rejected
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
// Target method execution timed out
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
// Detailed breakdown below
return handleBadRequestByEmittingError(e);
} else {
/* * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException. */
/ / out way, only when the subclasses autotype getExceptionFromThrowable () method is likely to enter here
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
returnhandleFailureViaFallback(e); }}};final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); }}; Observable<R> execution;/ / is focused on: executeCommandWithSpecifiedIsolation ()
/ / is focused on: executeCommandWithSpecifiedIsolation ()
if (properties.executionTimeoutEnabled().get()) {
// Timeout support is enabled. Much. Lift (new HystrixObservableTimeoutOperator < R > (_cmd)) call
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
// Timeout support is not enabled
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// Once you've got execution, start registering some basic events, observers
// -- doOnNext() : call before the observer is called back (when the data has already been sent, i.e. the target method has already been executed)
return execution.doOnNext(markEmits)
// -- doOnCompleted() : called on normal completion
.doOnCompleted(markOnCompleted)
// -- onErrorResumeNext() : called when executing an error
.onErrorResumeNext(handleFallback)
// -- doOnEach() : each call is executed. The request context is set for the child threads to complete cross-thread communication
.doOnEach(setRequestContext);
}
/** * The actual execution of the target method (handled depending on the isolation type specified: THREAD or SEMAPHORE) * -- THREAD: THREAD pool isolation (default) * -- SEMAPHORE: SEMAPHORE isolation */
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// Thread pool isolation (default)
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
// Set up a thread pool to isolate occupancy and record data
executionResult = executionResult.setExecutionOccurred();
// The thread state must be OBSERVABLE_CHAIN_CREATED for execution
// This state is set by toObservable()
if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// Collecting indicator information: The system starts to collect indicator information
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// The run() method has not been executed. It timed out during a thread switch and returns an exception
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// CAS sets the ThreadState to ThreadState.STARTED
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
// Thread global counter +1. Semaphore does not need this counter
HystrixCounters.incrementGlobalConcurrentThreads();
// marks the thread thread ready to start execution
threadPool.markThreadExecution();
// store the command that is being run
// This save uses ThreadLocal
> to bind to the current thread
// This ensures thread-safe execution of the command
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/** * If any of these hooks throw an exception, then it appears as if the actual execution threw an error */
try {
// Execute the hook program and the target run method program
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
/ / getUserExecutionObservable: getExecutionObservable () abstract methods to get the target method
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
returnObservable.error(ex); }}else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(() -> {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}).doOnUnsubscribe(() -> {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}).subscribeOn(threadPool.getScheduler(() -> properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT));
} else {
return Observable.defer(() -> {
executionResult = executionResult.setExecutionOccurred();
if(! commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
returnObservable.error(ex); }}); }}}Copy the code
Methods the drop
How to call a target method
What’s a downgrade
Hystrix triggers fallback degradation logic in 5 cases:
- Short – circuited short circuit
- Threadpool -rejected indicates the rejected of the threadpool
- Semaphore – Rejected Indicates the semaphore rejected
- Time – out timeout
- Failed Execution failed
But in addition to the above types, HystrixBadRequestException exception (not trigger back, don’t calculate failure indicator) will not trigger a fallback mechanism. Used in scenarios such as handling 400 error codes
The downgrade flow chart is as follows: