Column series: SpringCloud column series
Series of articles:
SpringCloud source series (1) – registry initialization for Eureka
SpringCloud source code series (2) – Registry Eureka service registration, renewal
SpringCloud source code series (3) – Registry Eureka crawl registry
SpringCloud source code series (4) – Registry Eureka service offline, failure, self-protection mechanism
SpringCloud source series (5) – Registry EurekaServer cluster for Eureka
SpringCloud source Code Series (6) – Summary of the Registry Eureka
SpringCloud source series (7) – load balancing Ribbon RestTemplate
SpringCloud source series (8) – load balancing Ribbon core principles
SpringCloud source series (9) – load balancing Ribbon core components and configuration
SpringCloud source Series (10) – HTTP client component of load balancing Ribbon
SpringCloud Source Series (11) – Retries and summaries of the Load Balancing Ribbon
SpringCloud source Code Series (12) – Basic usage of Service invocation Feign
SpringCloud source Code Series (13) – Service invocation of Feign’s scanning @FeignClient annotation interface
SpringCloud source code series (14) – Service calls to Feign build @FeignClient interface dynamic proxy
SpringCloud source Series (15) – Service calls Feign with the Ribbon for load balancing requests
SpringCloud source code series (16) – Fuse Hystrix basic introduction
SpringCloud source series (17) – fuse Hystrix fetch execution subscription object Observable
Thread pool isolation execution principle
Hystrix thread pool initialization
When constructing HystrixCommand, to initialize the Hystrix HystrixThreadPool thread pool, and can be found in the initialization of the logic in the structure of the default implementation class HystrixThreadPoolDefault approach, HystrixThreadPoolDefault is also a core component of thread pool scheduling.
Among them, the ThreadPoolExecutor thread pool is by concurrencyStrategy getThreadPool (threadPoolKey, the properties of this code to create, queueSize queue size default is 1, Queue queue is to obtain the ThreadPoolExecutor, so the structure of the thread pool have to continue watching concurrencyStrategy… getThreadPool.
class HystrixThreadPoolDefault implements HystrixThreadPool {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final int queueSize;
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
// Thread pool configuration
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
// Concurrency policy
HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
// Queue size, default -1
this.queueSize = properties.maxQueueSize().get();
// Hystrix thread pool metric
this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
concurrencyStrategy.getThreadPool(threadPoolKey, properties), // Create a thread pool
properties);
// By default, a thread pool using SynchronousQueue is returned, with the core thread count and maximum thread count defaults to 10
this.threadPool = this.metrics.getThreadPool();
// SynchronousQueue
this.queue = this.threadPool.getQueue();
/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }}Copy the code
Looking at the getThreadPool method, which builds the thread pool ThreadPoolExecutor based on the configuration, we get the following information.
- First of all, from the
getThreadFactory
The format of the thread name is as follows:"hystrix-{threadPoolKey}-{number}"
This is consistent with the hystrix thread name we saw in the log. - call
getBlockingQueue
Method to get a queue, but by defaultmaxQueueSize
Is -1, then the queue returned isSynchronousQueue
This is an empty queue, which means that by default tasks are not queued and are rejected if the pool is full. - If you are configured to allow scaling up to the maximum number of threads and the maximum number of threads is greater than the number of core threads, the created thread pool will continue to create threads until the maximum number of threads is reached when the number of core threads is full. Otherwise, you create the same number of core threads and maximum threads in the thread pool.
To summarize, by default, dynamicCoreSize, dynamicMaximumSize is 10, maxQueueSize equals 1, allowMaximumSizeToDivergeFromCoreSize to false by default.
ThreadPoolExecutor is created by default to provide the following features:
- The number of core threads equals the maximum number of threads.
- The work queue is an empty one
SynchronousQueue
The queue. - Based on the above two features, the maximum load of thread pool work is 10 threads working at the same time, after which the work will be rejected.
- The format of the thread name is:
"hystrix-{threadPoolKey}-{number}"
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
/ / name format: "hystrix -" + threadPoolKey. Name () + "-" + threadNumber. IncrementAndGet ()
final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
// Whether to allow the number of core threads to expand to the maximum number of threads. Default is false
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
// Number of core threads
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
// Thread lifetime
final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
MaxQueueSize The default value is -1
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
// maxQueueSize <= 0 ==> SynchronousQueue
// maxQueueSize > 0 ==> LinkedBlockingQueue(maxQueueSize)
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
// Allows scaling up to the maximum number of threads
if (allowMaximumSizeToDivergeFromCoreSize) {
// Maximum number of threads
final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
if (dynamicCoreSize > dynamicMaximumSize) {
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
} else {
return newThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}// Not allowed to extend to the maximum number of threads, the number of core threads == the maximum number of threads. This is where it goes by default
else {
return newThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
returnthread; }}; }public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
// maxQueueSize Defaults to -1
if (maxQueueSize <= 0) {
return new SynchronousQueue<Runnable>();
} else {
return newLinkedBlockingQueue<Runnable>(maxQueueSize); }}Copy the code
Thread pools schedule tasks
Create the scheduler
In the previous article we have analyzed the observables subscription object should be scheduled in executeCommandWithSpecifiedIsolation (_cmd) method, The entry point is the subscribeOn(Scheculer Scheduler) subscription. This scheduler is retrieved from the Hystrix thread pool initialized in the previous section, Threadpool.getscheduler (Func0 func) returns a HystrixContextScheduler.
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
Thread pool isolation
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call(a) {
/ /...
// Get the Observable of Run ()
return getUserExecutionObservable(_cmd);
}
})
.doOnTerminate(newAction0() {... }) .doOnUnsubscribe(newAction0() {... })// Place the Observable returned by defer into a scheduler to execute asynchronously. The scheduler ==> HystrixContextScheduler
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call(a) {
// Determine whether to interrupt thread execution
returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }})); }// Semaphore isolation
else {/ /... }
}
Copy the code
Next, see how to create a scheduler.
As shown by the getScheduler method, the configuration of thread pool ThreadPoolExecutor is first updated via touchConfig(), indicating that the thread pool parameters can be dynamically modified at run time. However, you can only modify the corePoolSize, maximumPoolSize, and keepAliveTime parameters of the thread pool, so you cannot dynamically expand the thread pool queue.
The HystrixContextScheduler is then created. If you go into the constructor, you can see that an actualScheduler is created internally. The actual type of the scheduler is ThreadPoolScheduler. You can assume that the final scheduled task is in ThreadPoolScheduler.
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
// Dynamically change the thread pool configuration
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
private void touchConfig(a) {
final int dynamicCoreSize = properties.coreSize().get();
final int configuredMaximumSize = properties.maximumSize().get();
int dynamicMaximumSize = properties.actualMaximumSize();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
boolean maxTooLow = false;
if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}
if(threadPool.getCorePoolSize() ! = dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() ! = dynamicMaximumSize)) { threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); }public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
// actualScheduler => rx.Scheduler => ThreadPoolScheduler
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
Copy the code
Task scheduling execution
Below is the source code for the thread pool scheduling process. The process of creating a scheduler and scheduling execution looks like a layer of nested agents.
- The process starts by creating the Hystrix context scheduler
HystrixContextScheduler
, but the actual scheduler isThreadPoolScheduler
. - After the scheduler is created, the task scheduler is created, starting with the HystrixContextScheduler call
createWorker()
Create Hystrix context scheduling workersHystrixContextSchedulerWorker
, but a proxy worker is created by ThreadPoolSchedulerThreadPoolWorker
. - Finally is the scheduling tasks, perform HystrixContextSchedulerWorker first
schedule
The scheduling method, which in turn uses ThreadPoolWorker to perform the scheduling.
In conclusion, the real scheduling logic, is actually in HystrixContextSchedulerWorker schedule and ThreadPoolWorker scheduling method.
HystrixContextScheduler ::
public Worker createWorker(a) {
// actualScheduler.createWorker() ==> ThreadPoolWorker
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
ThreadPoolScheduler ::
public Worker createWorker(a) {
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
HystrixContextSchedulerWorker ::
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// If the queue is full, throw an exception
if(threadPool ! =null) {
if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }}// Scheduling worker ==> ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
ThreadPoolWorker ::
public Subscription schedule(final Action0 action) {
// Encapsulate schedule Action
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// Submit the task to the thread poolFutureTask<? > f = (FutureTask<? >) executor.submit(sa);/ / interrupt task subscribe = = > shouldInterruptThread. The call () to determine whether the timeout interrupt tasks
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
Copy the code
For ease of understanding, the source code is abstracted into the following scheduling flow chart:
The thread pool was full and refused to execute
In HystrixContextSchedulerWorker scheduling method, first call threadPool. IsQueueSpaceAvailable () to determine whether a thread pool queue there is space available, if there is no abnormal will be thrown to refuse, If so, the ThreadPoolWorker is used for scheduling.
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// If the queue is full, throw an exception
if(threadPool ! =null) {
if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }}// Scheduling worker ==> ThreadPoolWorker
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
Copy the code
Let’s look at the logic of isqueue available.
By default, queueSize is -1. If queueSize <= 0, space is available, but this is actually a SynchronousQueue with no capacity. If true is returned, the task is thrown directly to the thread pool, and if the pool worker thread is full, the thread pool itself rejects the task, as you can see from its comments.
If set the queue size (configuration maxQueueSize), and the queue has been using the capacity of less than queueSizeRejectionThreshold (defaults to 5), just said queue have space available. It might be a little confusing here, but why not determine if the queue has any capacity left? Threadpool.getqueue ().remainingCapacity() > 0
Through its annotations can learn, it is in order to achieve the purpose of the expansion and dynamic, because the size of the queue can not be modified dynamically, but in order to achieve the goal of dynamic capacity queue at run time, it took another configuration queueSizeRejectionThreshold to control the number of queued. Such as maxQueueSize configuration 100, but queueSizeRejectionThreshold defaults to 5, so the queue is actually the most can only enter the five tasks; Runtime dynamically modify queueSizeRejectionThreshold is 20, 20 task queue at this time, would be the most; So in the configuration maxQueueSize than queueSizeRejectionThreshold makes sense. This kind of design is worth learning.
/** * Whether the threadpool queue has space available according to the queueSizeRejectionThreshold
settings. * * Note that the queueSize
is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The queueSizeRejectionThreshold
can be dynamic (up to queueSize
), so that should * still get checked on each invocation. * * If a SynchronousQueue implementation is used (maxQueueSize
<= 0), it always returns 0 as the size so this would always return true. */
@Override
public boolean isQueueSpaceAvailable(a) {
// The queue size defaults to -1
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead let the thread-pool reject or not
return true;
} else {
/ / queue have used capacity than the queue capacity declined to threshold, queueSizeRejectionThreshold defaults to 5
returnthreadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); }}Copy the code
The command is submitted to the thread pool for execution
Finally, look at the scheduling method of ThreadPoolWorker, where you finally see the code submitted to the thread pool, and this step really implements thread pool-based isolation.
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
/ / thread pool
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
// Submit the task to the thread poolFutureTask<? > f = (FutureTask<? >) executor.submit(sa);/ / interrupt task subscribe = = > shouldInterruptThread. The call () to determine whether the timeout interrupt tasks
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
Copy the code
Schedule method, finally, to submit to the thread pool ScheduledAction added a FutureCompleterWithConfigurableInterrupt subscription object, it will be cancelled in unsubscribe task execution, such as task timeout, At this point, the task is cancelled.
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private finalFutureTask<? > f;private final Func0<Boolean> shouldInterruptThread;
private final ThreadPoolExecutor executor;
private FutureCompleterWithConfigurableInterrupt(FutureTask
f, Func0
shouldInterruptThread, ThreadPoolExecutor executor)
{
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
this.executor = executor;
}
@Override
public void unsubscribe(a) {
executor.remove(f);
// Determine whether to cancel the task
if (shouldInterruptThread.call()) {
f.cancel(true);
} else {
f.cancel(false); }}}Copy the code
Timeout detection interrupts the thread
Timeout handler
Previous analysis to executeCommandAndObserve (_cmd) this method has the following this code, executeCommandWithSpecifiedIsolation (_cmd) return observables is mainly take different isolation strategy, Then, if you enable the timeout (enabled by default), will add a HystrixObservableTimeoutOperator operator, seems to be in control related overtime.
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
/ /...
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
/ / HystrixObservableTimeoutOperator control timeout
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// Set the subscription callback
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Copy the code
Continue to see HystrixObservableTimeoutOperator, it returns a Subscriber to a deal with the original observables.
- First of all, I defined one
HystrixContextRunnable
The callback throws HystrixTimeoutException. - And then we define one
TimerListener
Time listener, the interval of this listener isgetIntervalTimeInMilliseconds()
The returned time is the command execution timeout time. - Therefore, the main purpose of the TimerListener is to update the command after a timeout
isCommandTimedOut
The status of isCommandTimedOut is still if timeout is followedNOT_EXECUTED
, will be updated toTIMED_OUT
, cancels the execution of the task, and then issues a timeout exception. - And then I’m going to define
TimerListener
Added to theHystrixTimer
So the core logic of time control should be in HystrixTimer. - Finally, a Subscriber subscription is defined, and you can see that it updates the state of isCommandTimedOut and cleans up the TimerListener scheduler.
The isCommandTimedOut TimedOutStatus has three states: NOT_EXECUTED, COMPLETED, and TIMED_OUT.
private static class HystrixObservableTimeoutOperator<R> implements Operator<R.R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
child.add(s);
// The timeout callback throws HystrixTimeoutException
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run(a) {
child.onError(newHystrixTimeoutException()); }});// Timeout listener
TimerListener listener = new TimerListener() {
// The tick is called once at each interval
@Override
public void tick(a) {
// Update status after timeout
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// Failed to notify timeout
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// Stop the original request
s.unsubscribe();
// Throw a timeout exceptiontimeoutRunnable.run(); }}// Return the interval, which is the command execution timeout by default
@Override
public int getIntervalTimeInMilliseconds(a) {
returnoriginalCommand.properties.executionTimeoutInMilliseconds().get(); }};// Add a listener and start counting the timeout
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// Set to the original command
originalCommand.timeoutTimer.set(tl);
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted(a) {
if (isNotTimedOut()) {
tl.clear(); / / remove TimerListenerchild.onCompleted(); }}/ /...
private boolean isNotTimedOut(a) {
// After the task is executed, the isCommandTimedOut status is updated
returnoriginalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); }}; s.add(parent);returnparent; }}Copy the code
Timeout processor scheduling
HystrixTimer’s addTimerListener method is easier to understand. It initializes the task scheduler and schedules the execution of the TimerListener. The scheduler’s delayed execution time and interval period are the command timeouts returned by TimerListener.
To sum up, the TimerListener and HystrixTimer are integrated to detect whether the task times out. After the task times out, the timeout exception will be thrown and the task execution will be canceled. Then, the logic of demotion will be entered.
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
/ / initialize the executor scheduler: ScheduledThreadPoolExecutor
startThreadIfNeeded();
Runnable r = new Runnable() {
@Override
public void run(a) {
try {
// Trigger the listener
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e); }}};// The timeout period is 1000 milliseconds by default and is scheduled every 1000 millisecondsScheduledFuture<? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(),// How long to delay execution
listener.getIntervalTimeInMilliseconds(), // Execution cycle
TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
Copy the code
Drop the callback
Execution error type
In Hystrix Basics, we mentioned six types of Hystrix execution errors, as shown in the figure below, all of which, except BAD_REQUEST, are relegated to callback methods.
In AbstractCommand, you can see the following six error handling methods, one for each of Hystrix’s six error types. In analyzing the execution flow of HystrixCommand, we know that there are many callback methods, and the error callback will definitely call one of the following methods to handle the error.
It is important to note that in addition to handleBadRequestByEmittingError errors BAD_REQUEST () the processing method, The rest of the five methods finally will be called getFallbackOrThrowException method to get the callback method of observables, such shu corresponding up above. Account is encapsulated getFallbackOrThrowException error correction.
// The semaphore rejects callbacks
private Observable<R> handleSemaphoreRejectionViaFallback(a) {
Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
executionResult = executionResult.setExecutionException(semaphoreRejectionException);
eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}
// Short circuit callback
private Observable<R> handleShortCircuitViaFallback(a) {
eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
executionResult = executionResult.setExecutionException(shortCircuitException);
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
}
// The thread pool rejects callbacks
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
threadPool.markThreadRejection();
return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION,
"could not be queued for execution", underlying);
}
// Timeout callback
private Observable<R> handleTimeoutViaFallback(a) {
return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT,
"timed-out".new TimeoutException());
}
// Failed callback
private Observable<R> handleFailureViaFallback(Exception underlying) {
/ /...
eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}
/ / BAD_REQUEST processing
private Observable<R> handleBadRequestByEmittingError(Exception underlying) {
Exception toEmit = underlying;
/ /...
return Observable.error(toEmit);
}
Copy the code
Downgrade callback processing
And then to see the callback getFallbackOrThrowException processing method, can get the following information:
- The semaphore is used to limit the flow during the callback, i.e
getFallbackSemaphore()
Gets the semaphore. The actual type returned by default isTryableSemaphore
, the default value is 10. The callback method is then invoked after obtaining a license from the semaphore. - After obtaining the semaphore license, call
getFallbackObservable()
Gets the callback Observable if the user reloads itgetFallback()
Method returns the Observable of getFallback(); Throws if there is no overload"No fallback available."
The exception. - The final step is to subscribe to the callback object and enter the degraded callback method.
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// Enable the callback
if (properties.fallbackEnabled().get()) {
// Set the current thread
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? superR>>() {... };// Callback notification
final Action1<R> markFallbackEmit = newAction1<R>() {... };// Callback completion notification
final Action0 markFallbackCompleted = newAction0() {... };// Callback error post-processing
final Func1<Throwable, Observable<R>> handleFallbackError = newFunc1<Throwable, Observable<R>>() {... };// Callback semaphore => TryableSemaphore
final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// Release semaphore license
final Action0 singleSemaphoreRelease = newAction0() {... };// Get the callback Observable
Observable<R> fallbackExecutionChain;
// Get semaphore license
if (fallbackSemaphore.tryAcquire()) {
try {
// Does the user define a callback method that overwrites getFallback()
if (isFallbackUserDefined()) {
executionHook.onFallbackStart(this);
fallbackExecutionChain = getFallbackObservable();
} else {
// Throw an exception: "No fallback available."fallbackExecutionChain = getFallbackObservable(); }}catch (Throwable ex) {
fallbackExecutionChain = Observable.error(ex);
}
// Perform the callback
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(newDeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); }else {
returnhandleFallbackRejectionByEmittingError(); }}else {
returnhandleFallbackDisabledByEmittingError(originalException, failureType, message); }}Copy the code
A diagram summarizes the Hystrix thread pool isolation running process
The following diagram summarizes the process of submitting Hystrix tasks to the thread pool for execution.
Working principle of circuit breaker
Initialization of circuit breakers
In the AbstractCommand constructor, HystrixCircuitBreaker is initialized. If circuitBreakerEnabled, From HystrixCircuitBreaker. In the Factory for a circuit breaker, the default implementation class is HystrixCircuitBreakerImpl. If the circuit breaker is not enabled, the default implementation class is NoOpCircuitBreaker, which controls nothing.
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker,
this.commandGroup, this.commandKey, this.properties, this.metrics);
Copy the code
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
if (enabled) {
if (fromConstructor == null) {
// Get the default circuit breaker from Factory
return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
} else {
returnfromConstructor; }}else {
// Disable circuit breakers and return implementation classes that do nothing
return newNoOpCircuitBreaker(); }}Copy the code
HystrixCircuitBreakerImpl constructor main need HystrixCommandProperties and HystrixCommandMetrics, predictably, HystrixCommandMetrics is the source of HystrixCommandMetrics, which counts the number of hystrix request errors, timeouts, rejects, and successes before deciding whether to open the breaker.
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;
}
Copy the code
Working mechanism of circuit breaker
The breaker allows the request to be executed
When we looked at applyHystrixSemantics(_CMD), we saw that applyHystrixSemantics is the core entry to applyHystrix, and applyHystrixSemantics is the core entry to applyHystrix, and applyHystrixSemantics is the core entry to applyHystrix, If the circuit breaker does not allow it, the circuit breaker will reject the degraded method.
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
// Whether the breaker allows requests
if (circuitBreaker.allowRequest()) {
/ /...
} else {
// Short circuit => Degraded
returnhandleShortCircuitViaFallback(); }}Copy the code
Now look at the allowRequest() method:
- First determine if the breaker is forced on, and if it is, the request is not allowed. That is, we can manually open the circuit breaker in some cases to prevent cascading.
- It then determines whether the circuit breaker is forcibly closed, and if so, the request is allowed. Which means even if the circuit breaker is on, we can turn it off manually.
- If the circuit breaker is not manually opened or closed, it is eventually determined that the circuit breaker is not open, or if the circuit breaker is open but permits the release of a request to verify that the called party has recovered, the request is allowed to proceed. Otherwise, the request is rejected.
public boolean allowRequest(a) {
// The circuit breaker is manually forced on
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// The circuit breaker is closed manually
if (properties.circuitBreakerForceClosed().get()) {
// Perform statistics to determine whether to open the circuit breaker
isOpen();
return true;
}
// The circuit breaker is not on or running
return! isOpen() || allowSingleTest(); }Copy the code
Circuit breaker open
The isOpen() method is used to check whether the circuit breaker is on:
- In the first place to judge
circuitOpen
The value is true. If yes, the circuit breaker is on. That means the state of the circuit breaker is determined bycircuitOpen
AtomicBoolean to control. - If circuitOpen is false, it indicates that the circuit breaker is not enabled
HystrixCommandMetrics
To get a count of statisticsHealthCounts
The logic behind this is to calculate whether to open the circuit breaker based on HealthCounts. - if
Total requests < breaker request threshold (default 20)
I wouldn’t have opened the circuit breaker. - if
Error ratio < breaker error ratio threshold (default 50%)
I wouldn’t have opened the circuit breaker. - Otherwise, the circuit breaker will be turned on. The circuitOpen state is set to True and the circuit breaker opening time is set.
Hystrix circuit breaker design makes good use of the characteristics of atomic class. The update of circuitOpen uses compareAndSet based on CAS instruction, which can be updated safely even in high concurrency. The circuit breaker opening time is set after the update succeeds, and the circuit breaker will not be updated if the update fails.
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
public boolean isOpen(a) {
CircuitOpen Specifies whether to enable the circuit
if (circuitOpen.get()) {
return true;
}
// Get the request count
HealthCounts health = metrics.getHealthCounts();
// The value is smaller than the request threshold of the circuit breaker. The default value is 20. The circuit breaker is enabled only when the number of requests exceeds 20
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// Error percentage
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// Open the circuit breaker
if (circuitOpen.compareAndSet(false.true)) {
// Set the breaker open time
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true; }}}Copy the code
Circuit breaker half open
If the breaker is already open, it will sleep for some time and then send a request to test whether the called party has recovered. This is the half-open state.
AllowSingleTest (); allowSingleTest();
- Check whether the current time is longer than the circuit breaker on time + the circuit breaker sleep time. The default sleep time is 5000 ms. If yes, the circuit breaker on time is changed to the current time.
- That is, every 5,000 milliseconds, a request is sent for testing, and if the request continues to fail, the breaker remains open, which is called half-open.
OPEN -> HALF-OPEN
.
public boolean allowSingleTest(a) {
// Circuit breaker opening time
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// Circuit breaker open and current time > (Circuit breaker open time + circuit breaker sleep window time (default 5000 ms))
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// Switch on time is set to the current time
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true; }}return false;
Copy the code
Circuit breaker restoration
When does the circuit breaker go back on? HystrixCircuitBreaker has a method called markSuccess() that closes the circuit breaker when it opens.
public void markSuccess(a) {
if (circuitOpen.get()) {
// Turn off the circuit breaker
if (circuitOpen.compareAndSet(true.false)) { metrics.resetStream(); }}}Copy the code
In the executeCommandAndObserve(_cmd) method, you can see that there are two callback functions, markEmits and markOnCompleted, which will be executed on success and completion of the request. The callback function calls in the circuitBreaker. MarkSuccess (); To tell the breaker that the request was successful, and the breaker goes into the closed state, HALF -> CLOSE.
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// Mark that the command has been executed
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (commandIsScalar()) {
/ /...
// Tell the breaker that the request was successfulcircuitBreaker.markSuccess(); }}};// Indicates that the command execution is complete
final Action0 markOnCompleted = new Action0() {
@Override
public void call(a) {
if(! commandIsScalar()) {/ /...
// Tell the breaker that the request was successfulcircuitBreaker.markSuccess(); }}};// Handle the callback
final Func1<Throwable, Observable<R>> handleFallback = newFunc1<Throwable, Observable<R>>() {... };// Set the current thread
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? superR>>() {... }; Observable<R> execution;if (properties.executionTimeoutEnabled().get()) {
/ / overtime will be handled by HystrixObservableTimeoutOperator, throw HystrixTimeoutException timeout exception
execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
// Set the subscription callback
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
Copy the code
A diagram summarizes the working mechanism of a circuit breaker
Finally, a diagram is used to summarize the working mechanism of the circuit breaker.