1. Basic process
- Create commands, two Command types
- Execute Command in four ways
- Check whether request cache is enabled and whether request cache exists. If yes, read the request cache directly
2. Basic components
2.1 the Request Cache
First, there is the concept of a request context. Each request will be in the filter, and each request will be given a request context. In the context of a request, if there are multiple commands, their parameters will be the same, and the interface will be the same. In fact, it can be thought of the same way, in that case, we can have the result of the first Command executed, cached in memory, and then in the context of the request, all subsequent calls to that dependency are cached out of memory.
HystrixObservableCommand and HystrixObservableCommand can both specify a cache key, and hystrix will cache it automatically, and then when the same request context is accessed, the cache will be fetched directly. Avoid repeating network requests.
For Request Cache, Request consolidation, Request Log, etc., you must manage the declaration cycle of HystrixRequestContext yourself
2.2 Fallback degradation mechanism
Hystrix calls various interfaces, or accesses external dependencies, and an exception occurs
Each external dependency can only be accessed with a certain amount of resources, reject, thread pool/semaphore, and if the resource pool is full
Accessing an external dependency exposes a TimeOutException if a timeout occurs
When the proportion of abnormal events found by the short-circuit device reaches a certain proportion, the short-circuit is directly started
In all cases, fallback is implemented. For example, we can use the local cache as a memory cache based on lRU automatic cleaning. If this happens, we can try to fetch data from the cache, or return a default value
@Override
protected User getFallback(a) { // fallback
return new User("fallBack"."back"."back");
}
Copy the code
2.3 short circuiter
- If after a short circuit flow rate reached a certain threshold, HystrixCommandProperties. CircuitBreakerRequestVolumeThreshold ();
- If the breaker statistics to abnormal call ratio reached a certain threshold, the HystrixCommandProperties. CircuitBreakerErrorThresholdPercentage ();
- The short-circuiter switches from close to Open
When the short-circuiter is turned on, all the requests that pass through the short-circuiter are short-circuited, and the fallback mechanism is directly used instead of calling the back-end service
After a period of time, * * HystrixCommandProperties circuitBreakerSleepWindowInMilliseconds (), * * will be half – open, make a request through the short, can see the normal call, If the call succeeds, it automatically resumes and goes to the close state
3. Feign and Hystrix integration parameters
hystrix:
command:
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 1000
interruptOnTimeout: true
semaphore:
maxConcurrentRequests: 10
timeout:
enabled: true
circuitBreaker:
enabled: true
requestVolumeThreshold: 20
sleepWindowInMilliseconds: 5000
errorThresholdPercentage: 50
Copy the code
3.1 Thread pool-related properties
Hystrix. Threadpool. Default. CoreSize: the thread pool size, 10 by default
Hystrix. Threadpool. Default. MaximumSize: thread pool maximum size, 10 by default
Hystrix. Threadpool. Default. AllowMaximumSizeToDivergeFromCoreSize: whether to allow dynamic adjust the number of threads, the default false, only the set to true, the maximumSize above is valid
Hystrix. Threadpool. Default. KeepAliveTimeMinutes: the default is 1, beyond the thread of coreSize, free release after 1 minute
Hystrix. Threadpool. Default. MaxQueueSize default – 1, cannot be modified dynamically
Hystrix. Threadpool. Default. QueueSizeRejectionThreshold can dynamically modify, default is five, enter the request queue, and then carried out by the thread pool
3.2 Executing related attributes
Hystrix.com mand. Default. Execution. The isolation. The strategy: isolation strategy, the default Thread, can choose a Semaphore Semaphore
Hystrix.com mand. Default. Execution. The isolation. Thread. TimeoutInMilliseconds: timeout, the default 1000 ms
Hystrix.com mand. Default. Execution. A timeout. Enabled: whether to enable the timeout, the default is true
Hystrix.com mand. Default. Execution. The isolation. Thread. InterruptOnTimeout: whether the timeout time interrupt execution, the default is true
Hystrix.com mand. Default. Execution. The isolation. Semaphore. MaxConcurrentRequests: signal isolation strategy, allow maximum number of concurrent requests, 10 by default
3.3 Attributes related to degradation
Hystrix.com mand. Default. Fallback. Enabled by default is true
3.4 Fuse-related attributes
Hystrix.com mand. Default. CircuitBreaker. Enabled: whether to enable fuse true by default
Hystrix.com mand. Default. CircuitBreaker. RequestVolumeThreshold: 10 seconds, what is the number of requests to try to trigger fusing, 20 by default
Hystrix.com mand. Default. CircuitBreaker. ErrorThresholdPercentage: 10 seconds, the number of requests up to 20, abnormal ratio of 50% at the same time, will trigger the fusing, 50 by default
Hystrix.com mand. Default. CircuitBreaker. SleepWindowInMilliseconds: trigger after fusing, directly to refuse the request within 5 s, go down logic, 5 s after trying to lose half – open a small amount of traffic trying to restore, the default of 5000
Hystrix.com mand. Default. CircuitBreaker. ForceOpen: forced open the fuse
Hystrix.com mand. Default. CircuitBreaker. ForceClosed: forced to close the fuse
3.5 Metric attributes
Hystrix. Threadpool. Default. The metrics. RollingStats. TimeInMillisecond: thread pool statistical indicators of the time, the default, 10000 is 10 s
Hystrix. Threadpool. Default. The metrics. RollingStats. NumBuckets: the rolling window is divided into n buckets, 10 by default
Hystrix.com mand. Default. The metrics. RollingStats. TimeInMilliseconds: the command of the statistical time, whether the fuse will open, according to a rolling window statistics to calculate. If Rolling Window is set to 10000 ms, rolling Window will be divided into N buckets, and each bucket contains the statistics of The Times of success, failure, timeout and rejection. The default is 10000
Hystrix.com mand. Default. The metrics. RollingStats. NumBuckets set up a rolling window is divided into the number of, if numBuckets = 10, rolling window = 10000, The time of a bucket is then 1 second. Must match rolling Window % numberBuckets == 0. The default 10
Hystrix.com mand. Default. The metrics. RollingPercentile. Enabled if executes the enable index calculation and tracking, the default is true
Hystrix.com mand. Default. The metrics. RollingPercentile. TimeInMilliseconds set rolling the percentile window of time, the default of 60000
Hystrix.com mand. Default. The metrics. RollingPercentile. NumBuckets set rolling the percentile numberBuckets window. Same logic. The default 6
Hystrix.com mand. Default. The metrics. RollingPercentile. BucketSize if the bucket size = 100, window = 10 s, if are there 500 times in the 10 s, Only the last 100 executions are counted in the bucket. Increasing this value increases memory overhead as well as sorting overhead. The default is 100
Hystrix.com mand. Default. The metrics. HealthSnapshot. IntervalInMilliseconds record health snapshot (for success and error statistics green) intervals, the default 500 ms
3.6 Advanced Features
Hystrix.com mand. Default. RequestCache. Enabled is true
Hystrix.com mand. Default. RequestLog. Enabled logging to HystrixRequestLog, true by default
Hystrix. Collapser. Default. MaxRequestsInBatch single batch the maximum number of requests, to reach the trigger the batch number, the default Integer. MAX_VALUE
Hystrix. Collapser. Default. TimerDelayInMilliseconds trigger delay of the batch, can also create a batch of time + the value, the default 10
Hystrix. Collapser. Default. RequestCache. If enabled for HystrixCollapser. The execute () and HystrixCollapser. Queue () cache, true by default
4. Integration principle of Feign and Hystrix
4.1 Hystrix Generates dynamic proxies
Mainly through HystrixTaget instance methods of the target, first of all judge previously built Feign. Builder is Feign. Hystrix. HystrixFeign. Builder.
We then use the targetWithFallback method to get a separate FallbackFactory from the service-independent Spring container to get the instance of our defined Fallback object. It verifies that the fallback is ok and that the Fallback factory has returned a fall object that cannot be null
After through builder build method, according to the service information, parameters, such as building HystrixInvocationHandler instance, and set up a contract for HystrixDelegatingContract, This example is used to parse Hystrix annotations.
Then build ReflectiveFeign from its parent class Feign.Builder. In this case, the logic is the same as Feign used to build dynamic proxies.
4.2 Execution Process
A HystrixInvocationHadnler object is initialized when a dynamic proxy object is built. The constructor of this instance uses a toSetters method, which assemes the method with the corresponding key information.
static Map<Method, Setter> toSetters(SetterFactory setterFactory, Target
target, Set
methods)
{
Map<Method, Setter> result = new LinkedHashMap<Method, Setter>();
for (Method method : methods) {
method.setAccessible(true);
result.put(method, setterFactory.create(target, method));
}
return result;
}
public HystrixCommand.Setter create(Target
target, Method method) {
String groupKey = target.name();
String commandKey = Feign.configKey(target.type(), method);
return HystrixCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupKey))
.andCommandKey(HystrixCommandKey.Factory.asKey(commandKey));
}
Copy the code
The main core key is that the create method, the target object is the information about the encapsulated service, and we can see from the source that a service name, which is the service name that we set in the @FeignClient annotation, As a groupKey, and the method name and method parameters in this interface as commandKey
A groupkey represents a thread pool, that is, the interface invocation requests contained in the service are completed by the thread pool. An interface in a service represents a commandKey, which belongs to the groupkey of the service
We then build an instance of HystrixCommand using the setterMethodMap, which is responsible for executing the logic, the general logic, through the run() Method, through the handler object bound to the Method object. There is also a method called getFallback, in which the logic is basically the same as above: get the corresponding Fallback object from the factory, get the corresponding handler from the handler factory, and execute the degraded method.
Execute () via HystrixCommand, queue(), toObservable().toblocking ().toFuture() To put it simply, it obtained the corresponding Future object from the thread pool corresponding to the service, and then wrapped the Future object, mainly because the original Future object could not cancel. Finally, it returned the wrapped object F, and judged whether the execution was completed by F. The command result is displayed
public Future<R> queue(a) {
// Get the Future of the service
final Future<R> delegate = toObservable().toBlocking().toFuture();
// Wrap the native Future
final Future<R> f = new Future<R>() {
...
}
if (f.isDone()) { // Check whether the execution is complete
try {
f.get();
return f; / / return
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throwhre; }}else {
throwExceptions.sneakyThrow(t); }}}return f;
}
Copy the code
In the toObservable method, an Observable is created, and inside it, Created terminateCommandCleanup, unsubscribeCommandCleanup, applyHystrixSemantics, wrapWithAllOnNextHooks, fireOnCompletedHook These objects, which are used for callbacks, are wrapped in Obserable objects, and the call method of those objects is invoked when the toBlocking() method is invoked.
After the toBlocking() method is invoked, the call() method of Func0 is actually called, which is the entry method to execute, and its internal implementation is relatively simple: The command state is changed from NOT_STARTED to OBSERVABLE_CHAIN_CREATE, and then the call logging is enabled. By default, the call logging is enabled, but not processed, and caching is disabled. In the end, a hystrixObservable is built, and the object created before is given to him. After creating an afterCache, the hystrixObservable is assigned to him, which is the returned Observable instance.
When it calls its callback, it calls the applyHystrixSemantics callback method, which is appliHystirxSemantics, and the short-circuiting semantics method says whether or not it can execute, if it can execute, The executeCommandAndObserve method is then called, where many more objects are built: markEmits, markOnCompleted, handleFallback, setRequestContext, and then determine whether timeout detection is enabled. Through executeCommandWithSpecifiedIsolation method of Func0. Call () method, modify some status value, Hystrix. StartCurrentThreadExecutingCommand (getCommandKey ()), in this way will commandKey YaRu stack, finally through getUserExecutionObservable method, Set the callback method, which executes the run() method, returns the final userObservable, and subscribes to the userObservable via the callback. The run() Method is called by the func0.call () callback in getExecutionObservable, and the invoke Method is invoked by the corresponding handler of the Method object for final execution.
Grass, the above is too messy, I will change it slowly, some of it is really hard to tidy up, then look at the picture. Follow up with more source code assistance
4.3 Thread Pool Related
In fact, the related thread pool constructor logic in HystrixInvocationHandler, whose invoke method builds HystrixCommand instances, calls the constructor of the parent class AbstractCommand
protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {
//
this.commandGroup = initGroupKey(group);
this.commandKey = initCommandKey(key, getClass());
this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
// This method is used for thread pool initialization
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); . . }Copy the code
The initThreadPool method above calls a getInstance method
private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
if (fromConstructor == null) {
// get the default implementation of HystrixThreadPool
return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
} else {
returnfromConstructor; }}Copy the code
HystrixThreadPoolDefault is used to initialize the thread pool from this class if the threadPoolKey does not have a corresponding thread pool. The threadPools attribute is a concurrentHashMap
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
// get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if(previouslyCached ! =null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if(! threadPools.containsKey(key)) {// This is the core
threadPools.put(key, newHystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); }}return threadPools.get(key);
}
Copy the code
Then go down, to see how to build HystrixThreadPoolDefault, or access to the relevant attribute information, first by concurrencyStrategy. GetThreadPool () method, the implementation of a thread pool to create
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.queueSize = properties.maxQueueSize().get();
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties),
properties);
this.threadPool = this.metrics.getThreadPool();
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}
Copy the code
Now it’s pretty clear what some of the parameters mean, and I’m going to draw a table in a second, and I’m going to build queues and thread pools based on those parameters, and by default, CoreSize = 10, maxQueueSize = -1 this indirectly causes BlockingQueue to be of type SynchronousQueue, which means it is not queued by default
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) {
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " +
dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return newThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}else {
return newThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}Copy the code
allowMaximumSizeToDivergeFromCoreSize | True /false indicates whether automatic expansion is allowed |
---|---|
keepAliveTime | When this parameter is enabled, new threads are automatically created, and new threads are reclaimed when they are idle for longer than this time |
dynamicMaximumSize | If the preceding parameter is enabled, the maximum number of threads can be expanded |
maxQueueSize | How many requests can a queued queue hold? The default value is -1 |
BlockingQueue | Queue, because that parameter defaults to -1, this queue defaults to SynchronousQueue, which is not queued; LinkedBlockingQueue(maxQueueSize) if the value > 0 |
dynamicCoreSize | The default is 10, and there are ten threads in the thread pool |
Finally, to a small map, deepen the understanding, the map is not big, live to see, the final optimization in the late version
Actually executeCommandWithSpecifiedIsolation the call () method, carries on the execution of the observables, ordered a subscribeOn () method, the core of this code is no longer in the call, Instead, it’s in getScheduler()
subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call(a) {
returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }}Copy the code
In this case, a new HystrixContextScheduler is created, which simply assigns some properties to it, and then a new instance of ThreadPoolScheduler is created
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig(); // Set some thread pool parameters
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
Copy the code
HystrixContextScheduler invokes the createWorker inside (), created HystrixContextSchedulerWorker, through its scheduler (), this method can judge whether the queue of the thread pool is full
@Override
public Subscription schedule(Action0 action) {
if(threadPool ! =null) {
if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }}return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
Copy the code
Mainly through! ThreadPool. IsQueueSpaceAvailable () to judge, logic is relatively simple, maxQueueSize < 0, direct return is full, if not, still have to determine: The amount of the current queue < queueSizeRejectionThreshold attribute values
public boolean isQueueSpaceAvailable(a) {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
returnthreadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); }}Copy the code
Judgment after, full of error directly, not full began by ThreadPoolWorker schedule method, submitted the task, before calling the method is to create a HystrixContexSchedulerAction object, This is the ScheduledAction below.
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = newScheduledAction(action); subscription.add(sa); sa.addParent(subscription); ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor(); FutureTask<? > f = (FutureTask<? >) executor.submit(sa); sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
Copy the code
ToObservable ().toblocking ().toFuture() = toObservable().toblocking ().toFuture()
Then there is the execution, and continue to go, will find that the last or executeCommandWithSpecifiedIsolation Func0. In the () call () method, the call () method in the userObservable, Execute command’s run method, and execute this logic, which I wrote earlier, although it’s messy.
* * * *
4.4 the timeout
The main logic of this timeout, after creating observables, give the observables added a HystrixObservableTimeoutOperator instance, it’s the call () method is the key, the main, If a TimeListener is executed, the tick() method will handle the timeout logic. If the state is NOT_EXECUTED, change it to TIME_OUT and throw an exception
TimerListener listener = new TimerListener() {
@Override
public void tick(a) {
// if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
// otherwise it means we lost a race and the run() execution completed or did not start
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// report timeout failure
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// shut down the original request
s.unsubscribe();
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
@Override
public void run(a) {
child.onError(newHystrixTimeoutException()); }}); timeoutRunnable.run();//if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout}}@Override
public int getIntervalTimeInMilliseconds(a) {
returnoriginalCommand.properties.executionTimeoutInMilliseconds().get(); }};Copy the code
The TimeListener is then added to the thread pool. If it times out, it has its own thread pool (coreSize = 4 by default) that has the Runnable thread class to call the tick() method above. And every time we set the timeout time to execute the task, and finally after the task is executed, if there is no timeout, the scheduled task will be cleared.
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener
Runnable r = new Runnable() {
@Override
public void run(a) {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e); }}}; ScheduledFuture<? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);return new TimerReference(listener, f);
}
Copy the code
4.5 a fuse
A HystrixCircuitBreaker will initialize a HystrixCircuitBreaker when HystrixCommand is created. The circuit breaker will initialize a HystrixCircuitBreaker when HystrixCommand is created. The circuit breaker will initialize a HystrixCircuitBreaker when HystrixCommand is created. Ensure that each interface in each service has its own fuse
Go through HystrixCircuitBreakerImpl build fuse, Subscription s = subscribeToStream (); Is the core of judgment logic.
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}
Copy the code
The main idea here is to use the subscriber, subscription metrics statistics, and the onNext() callback to determine whether to turn on the fuse.
First of all to ensure that within a time window (the default 10 s), the total number of requests is greater than the circuitBreakerRequestVolumeThreshold 20 (the default), then to judge, Request is greater than percentage of abnormal circuitBreakerErrorThresholdPercentage 50% (the default), if all meet, Set status.compareAndSet(status.closed, status.open) to OPEN
private Subscription subscribeToStream(a) {
/* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream */
return metrics.getHealthCountsStream()
.observe()
.subscribe(newSubscriber<HealthCounts>() { ... .@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if(status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); }}}}});Copy the code
When the fuse is turned on, it checks whether or not to use the fuse in the applyHystrixSemantics callback of the Observable
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// mark that we're starting execution on the ExecutionHook
// if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
executionHook.onStart(_cmd);
/* determine if we're allowed to execute */
if (circuitBreaker.attemptExecution()) {
/ / todo
} else {
// Todo is demoted}}Copy the code
The default is -1, but the fuse is set to the current time when it is on. There is also an isAfterSleepWindow () method that checks if the fuse is on. 5 s is greater than the circuitBreakerSleepWindowInMilliseconds (the default), facilitate subsequent set to HAFL_OPEN, retry to try again, restore close the circuit breaker.
@Override
public boolean attemptExecution(a) {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false; }}else {
return false; }}}private boolean isAfterSleepWindow(a) {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}
Copy the code
In terms of the above judgment, if the current time exceeds the opening time of the fuse + the interval we set, the fuse will be set to HASL_OPEN. At this time, the fuse will not be broken, and a normal process will be tried. If the normal process fails again, Can be carried through the call () callback handleFallback circuitBreaker, markNonSuccess (); , the logic is very simple, is to set the fuse to OPEN, update the current timestamp
@Override
public void markNonSuccess(a) {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep windowcircuitOpened.set(System.currentTimeMillis()); }}Copy the code
If the implementation is successful, will pass the callback markEmits circuitBreaker, markSuccess (); Turn off the fuse
@Override
public void markSuccess(a) {
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if(previousSubscription ! =null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L); }}Copy the code