Column series: SpringCloud column series

Series of articles:

SpringCloud source series (1) – registry initialization for Eureka

SpringCloud source code series (2) – Registry Eureka service registration, renewal

SpringCloud source code series (3) – Registry Eureka crawl registry

SpringCloud source code series (4) – Registry Eureka service offline, failure, self-protection mechanism

SpringCloud source series (5) – Registry EurekaServer cluster for Eureka

SpringCloud source Code Series (6) – Summary of the Registry Eureka

SpringCloud source series (7) – load balancing Ribbon RestTemplate

SpringCloud source series (8) – load balancing Ribbon core principles

SpringCloud source series (9) – load balancing Ribbon core components and configuration

SpringCloud source Series (10) – HTTP client component of load balancing Ribbon

SpringCloud Source Series (11) – Retries and summaries of the Load Balancing Ribbon

SpringCloud source Code Series (12) – Basic usage of Service invocation Feign

SpringCloud source Code Series (13) – Service invocation of Feign’s scanning @FeignClient annotation interface

SpringCloud source code series (14) – Service calls to Feign build @FeignClient interface dynamic proxy

SpringCloud source Series (15) – Service calls Feign with the Ribbon for load balancing requests

SpringCloud source code series (16) – Fuse Hystrix basic introduction

SpringCloud source series (17) – fuse Hystrix fetch execution subscription object Observable

Thread pool isolation execution principle

Hystrix thread pool initialization

When constructing HystrixCommand, to initialize the Hystrix HystrixThreadPool thread pool, and can be found in the initialization of the logic in the structure of the default implementation class HystrixThreadPoolDefault approach, HystrixThreadPoolDefault is also a core component of thread pool scheduling.

Among them, the ThreadPoolExecutor thread pool is by concurrencyStrategy getThreadPool (threadPoolKey, the properties of this code to create, queueSize queue size default is 1, Queue queue is to obtain the ThreadPoolExecutor, so the structure of the thread pool have to continue watching concurrencyStrategy… getThreadPool.

class HystrixThreadPoolDefault implements HystrixThreadPool {
  private final BlockingQueue<Runnable> queue;
  private final ThreadPoolExecutor threadPool;
  private final HystrixThreadPoolMetrics metrics;
  private final int queueSize;

  public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
      // Thread pool configuration
      this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
      // Concurrency policy
      HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
      // Queue size, default -1
      this.queueSize = properties.maxQueueSize().get();

      // Hystrix thread pool metric
      this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
              concurrencyStrategy.getThreadPool(threadPoolKey, properties), // Create a thread pool
              properties);
      // By default, a thread pool using SynchronousQueue is returned, with the core thread count and maximum thread count defaults to 10
      this.threadPool = this.metrics.getThreadPool();
      // SynchronousQueue
      this.queue = this.threadPool.getQueue();

      /* strategy: HystrixMetricsPublisherThreadPool */
      HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties); }}Copy the code

Looking at the getThreadPool method, which builds the thread pool ThreadPoolExecutor based on the configuration, we get the following information.

  • First of all, from thegetThreadFactoryThe format of the thread name is as follows:"hystrix-{threadPoolKey}-{number}"This is consistent with the hystrix thread name we saw in the log.
  • callgetBlockingQueueMethod to get a queue, but by defaultmaxQueueSizeIs -1, then the queue returned isSynchronousQueueThis is an empty queue, which means that by default tasks are not queued and are rejected if the pool is full.
  • If you are configured to allow scaling up to the maximum number of threads and the maximum number of threads is greater than the number of core threads, the created thread pool will continue to create threads until the maximum number of threads is reached when the number of core threads is full. Otherwise, you create the same number of core threads and maximum threads in the thread pool.

To summarize, by default, dynamicCoreSize, dynamicMaximumSize is 10, maxQueueSize equals 1, allowMaximumSizeToDivergeFromCoreSize to false by default.

ThreadPoolExecutor is created by default to provide the following features:

  • The number of core threads equals the maximum number of threads.
  • The work queue is an empty oneSynchronousQueueThe queue.
  • Based on the above two features, the maximum load of thread pool work is 10 threads working at the same time, after which the work will be rejected.
  • The format of the thread name is:"hystrix-{threadPoolKey}-{number}"
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    / / name format: "hystrix -" + threadPoolKey. Name () + "-" + threadNumber. IncrementAndGet ()
    final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
    // Whether to allow the number of core threads to expand to the maximum number of threads. Default is false
    final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    // Number of core threads
    final int dynamicCoreSize = threadPoolProperties.coreSize().get();
    // Thread lifetime
    final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
    MaxQueueSize The default value is -1
    final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
    // maxQueueSize <= 0 ==> SynchronousQueue
    // maxQueueSize > 0 ==> LinkedBlockingQueue(maxQueueSize)
    final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

    // Allows scaling up to the maximum number of threads
    if (allowMaximumSizeToDivergeFromCoreSize) {
        // Maximum number of threads
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return newThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}// Not allowed to extend to the maximum number of threads, the number of core threads == the maximum number of threads. This is where it goes by default
    else {
        return newThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); }}private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {
    return new ThreadFactory() {
        private final AtomicInteger threadNumber = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
            thread.setDaemon(true);
            returnthread; }}; }public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
    // maxQueueSize Defaults to -1
    if (maxQueueSize <= 0) {
        return new SynchronousQueue<Runnable>();
    } else {
        return newLinkedBlockingQueue<Runnable>(maxQueueSize); }}Copy the code

Thread pools schedule tasks

Create the scheduler

In the previous article we have analyzed the observables subscription object should be scheduled in executeCommandWithSpecifiedIsolation (_cmd) method, The entry point is the subscribeOn(Scheculer Scheduler) subscription. This scheduler is retrieved from the Hystrix thread pool initialized in the previous section, Threadpool.getscheduler (Func0 func) returns a HystrixContextScheduler.

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    Thread pool isolation
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call(a) {
                / /...
                // Get the Observable of Run ()
                return getUserExecutionObservable(_cmd);
            }
        })
        .doOnTerminate(newAction0() {... }) .doOnUnsubscribe(newAction0() {... })// Place the Observable returned by defer into a scheduler to execute asynchronously. The scheduler ==> HystrixContextScheduler
        .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call(a) {
                // Determine whether to interrupt thread execution
                returnproperties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; }})); }// Semaphore isolation
    else {/ /... }
}
Copy the code

Next, see how to create a scheduler.

As shown by the getScheduler method, the configuration of thread pool ThreadPoolExecutor is first updated via touchConfig(), indicating that the thread pool parameters can be dynamically modified at run time. However, you can only modify the corePoolSize, maximumPoolSize, and keepAliveTime parameters of the thread pool, so you cannot dynamically expand the thread pool queue.

The HystrixContextScheduler is then created. If you go into the constructor, you can see that an actualScheduler is created internally. The actual type of the scheduler is ThreadPoolScheduler. You can assume that the final scheduled task is in ThreadPoolScheduler.

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    // Dynamically change the thread pool configuration
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

private void touchConfig(a) {
    final int dynamicCoreSize = properties.coreSize().get();
    final int configuredMaximumSize = properties.maximumSize().get();
    int dynamicMaximumSize = properties.actualMaximumSize();
    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    boolean maxTooLow = false;

    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
        dynamicMaximumSize = dynamicCoreSize;
        maxTooLow = true;
    }

    if(threadPool.getCorePoolSize() ! = dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() ! = dynamicMaximumSize)) { threadPool.setCorePoolSize(dynamicCoreSize); threadPool.setMaximumPoolSize(dynamicMaximumSize); } threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES); }public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
    this.concurrencyStrategy = concurrencyStrategy;
    this.threadPool = threadPool;
    // actualScheduler => rx.Scheduler => ThreadPoolScheduler
    this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}
Copy the code

Task scheduling execution

Below is the source code for the thread pool scheduling process. The process of creating a scheduler and scheduling execution looks like a layer of nested agents.

  • The process starts by creating the Hystrix context schedulerHystrixContextScheduler, but the actual scheduler isThreadPoolScheduler.
  • After the scheduler is created, the task scheduler is created, starting with the HystrixContextScheduler callcreateWorker()Create Hystrix context scheduling workersHystrixContextSchedulerWorker, but a proxy worker is created by ThreadPoolSchedulerThreadPoolWorker.
  • Finally is the scheduling tasks, perform HystrixContextSchedulerWorker firstscheduleThe scheduling method, which in turn uses ThreadPoolWorker to perform the scheduling.

In conclusion, the real scheduling logic, is actually in HystrixContextSchedulerWorker schedule and ThreadPoolWorker scheduling method.

HystrixContextScheduler ::
public Worker createWorker(a) {
    // actualScheduler.createWorker() ==> ThreadPoolWorker
    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

ThreadPoolScheduler ::
public Worker createWorker(a) {
    return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}

HystrixContextSchedulerWorker :: 
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    // If the queue is full, throw an exception
    if(threadPool ! =null) {
        if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }}// Scheduling worker ==> ThreadPoolWorker
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}

ThreadPoolWorker ::
public Subscription schedule(final Action0 action) {
    // Encapsulate schedule Action
    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    // Submit the task to the thread poolFutureTask<? > f = (FutureTask<? >) executor.submit(sa);/ / interrupt task subscribe = = > shouldInterruptThread. The call () to determine whether the timeout interrupt tasks
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}
Copy the code

For ease of understanding, the source code is abstracted into the following scheduling flow chart:

The thread pool was full and refused to execute

In HystrixContextSchedulerWorker scheduling method, first call threadPool. IsQueueSpaceAvailable () to determine whether a thread pool queue there is space available, if there is no abnormal will be thrown to refuse, If so, the ThreadPoolWorker is used for scheduling.

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    // If the queue is full, throw an exception
    if(threadPool ! =null) {
        if(! threadPool.isQueueSpaceAvailable()) {throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold."); }}// Scheduling worker ==> ThreadPoolWorker
    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
Copy the code

Let’s look at the logic of isqueue available.

By default, queueSize is -1. If queueSize <= 0, space is available, but this is actually a SynchronousQueue with no capacity. If true is returned, the task is thrown directly to the thread pool, and if the pool worker thread is full, the thread pool itself rejects the task, as you can see from its comments.

If set the queue size (configuration maxQueueSize), and the queue has been using the capacity of less than queueSizeRejectionThreshold (defaults to 5), just said queue have space available. It might be a little confusing here, but why not determine if the queue has any capacity left? Threadpool.getqueue ().remainingCapacity() > 0

Through its annotations can learn, it is in order to achieve the purpose of the expansion and dynamic, because the size of the queue can not be modified dynamically, but in order to achieve the goal of dynamic capacity queue at run time, it took another configuration queueSizeRejectionThreshold to control the number of queued. Such as maxQueueSize configuration 100, but queueSizeRejectionThreshold defaults to 5, so the queue is actually the most can only enter the five tasks; Runtime dynamically modify queueSizeRejectionThreshold is 20, 20 task queue at this time, would be the most; So in the configuration maxQueueSize than queueSizeRejectionThreshold makes sense. This kind of design is worth learning.

/** * Whether the threadpool queue has space available according to the queueSizeRejectionThreshold settings. * * Note that the queueSize is an final instance variable on HystrixThreadPoolDefault, and not looked up dynamically. * The data structure is static, so this does not make sense as a dynamic lookup. * The queueSizeRejectionThreshold can be dynamic (up to queueSize), so that should * still get checked on each invocation. * 

* If a SynchronousQueue implementation is used (maxQueueSize <= 0), it always returns 0 as the size so this would always return true. */

@Override public boolean isQueueSpaceAvailable(a) { // The queue size defaults to -1 if (queueSize <= 0) { // we don't have a queue so we won't look for space but instead let the thread-pool reject or not return true; } else { / / queue have used capacity than the queue capacity declined to threshold, queueSizeRejectionThreshold defaults to 5 returnthreadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get(); }}Copy the code

The command is submitted to the thread pool for execution

Finally, look at the scheduling method of ThreadPoolWorker, where you finally see the code submitted to the thread pool, and this step really implements thread pool-based isolation.

@Override
public Subscription schedule(final Action0 action) {
    if (subscription.isUnsubscribed()) {
        // don't schedule, we are unsubscribed
        return Subscriptions.unsubscribed();
    }

    // This is internal RxJava API but it is too useful.
    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    / / thread pool
    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    // Submit the task to the thread poolFutureTask<? > f = (FutureTask<? >) executor.submit(sa);/ / interrupt task subscribe = = > shouldInterruptThread. The call () to determine whether the timeout interrupt tasks
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}
Copy the code

Schedule method, finally, to submit to the thread pool ScheduledAction added a FutureCompleterWithConfigurableInterrupt subscription object, it will be cancelled in unsubscribe task execution, such as task timeout, At this point, the task is cancelled.

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
    private finalFutureTask<? > f;private final Func0<Boolean> shouldInterruptThread;
    private final ThreadPoolExecutor executor;

    private FutureCompleterWithConfigurableInterrupt(FutureTask
        f, Func0
       
         shouldInterruptThread, ThreadPoolExecutor executor)
        {
        this.f = f;
        this.shouldInterruptThread = shouldInterruptThread;
        this.executor = executor;
    }

    @Override
    public void unsubscribe(a) {
        executor.remove(f);
        // Determine whether to cancel the task
        if (shouldInterruptThread.call()) {
            f.cancel(true);
        } else {
            f.cancel(false); }}}Copy the code

Timeout detection interrupts the thread

Timeout handler

Previous analysis to executeCommandAndObserve (_cmd) this method has the following this code, executeCommandWithSpecifiedIsolation (_cmd) return observables is mainly take different isolation strategy, Then, if you enable the timeout (enabled by default), will add a HystrixObservableTimeoutOperator operator, seems to be in control related overtime.

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    / /...
    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        / / HystrixObservableTimeoutOperator control timeout
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    // Set the subscription callback
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
Copy the code

Continue to see HystrixObservableTimeoutOperator, it returns a Subscriber to a deal with the original observables.

  • First of all, I defined oneHystrixContextRunnableThe callback throws HystrixTimeoutException.
  • And then we define oneTimerListenerTime listener, the interval of this listener isgetIntervalTimeInMilliseconds()The returned time is the command execution timeout time.
  • Therefore, the main purpose of the TimerListener is to update the command after a timeoutisCommandTimedOutThe status of isCommandTimedOut is still if timeout is followedNOT_EXECUTED, will be updated toTIMED_OUT, cancels the execution of the task, and then issues a timeout exception.
  • And then I’m going to defineTimerListenerAdded to theHystrixTimerSo the core logic of time control should be in HystrixTimer.
  • Finally, a Subscriber subscription is defined, and you can see that it updates the state of isCommandTimedOut and cleans up the TimerListener scheduler.

The isCommandTimedOut TimedOutStatus has three states: NOT_EXECUTED, COMPLETED, and TIMED_OUT.

private static class HystrixObservableTimeoutOperator<R> implements Operator<R.R> {
    final AbstractCommand<R> originalCommand;

    public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
        this.originalCommand = originalCommand;
    }

    @Override
    public Subscriber<? super R> call(final Subscriber<? super R> child) {
        final CompositeSubscription s = new CompositeSubscription();
        child.add(s);

        // The timeout callback throws HystrixTimeoutException
        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
            @Override
            public void run(a) {
                child.onError(newHystrixTimeoutException()); }});// Timeout listener
        TimerListener listener = new TimerListener() {
            // The tick is called once at each interval
            @Override
            public void tick(a) {
                // Update status after timeout
                if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                    // Failed to notify timeout
                    originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
                    // Stop the original request
                    s.unsubscribe();
                    // Throw a timeout exceptiontimeoutRunnable.run(); }}// Return the interval, which is the command execution timeout by default
            @Override
            public int getIntervalTimeInMilliseconds(a) {
                returnoriginalCommand.properties.executionTimeoutInMilliseconds().get(); }};// Add a listener and start counting the timeout
        final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
        // Set to the original command
        originalCommand.timeoutTimer.set(tl);

        Subscriber<R> parent = new Subscriber<R>() {
            @Override
            public void onCompleted(a) {
                if (isNotTimedOut()) {
                    tl.clear(); / / remove TimerListenerchild.onCompleted(); }}/ /...
            private boolean isNotTimedOut(a) {
            	// After the task is executed, the isCommandTimedOut status is updated
                returnoriginalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); }}; s.add(parent);returnparent; }}Copy the code

Timeout processor scheduling

HystrixTimer’s addTimerListener method is easier to understand. It initializes the task scheduler and schedules the execution of the TimerListener. The scheduler’s delayed execution time and interval period are the command timeouts returned by TimerListener.

To sum up, the TimerListener and HystrixTimer are integrated to detect whether the task times out. After the task times out, the timeout exception will be thrown and the task execution will be canceled. Then, the logic of demotion will be entered.

public Reference<TimerListener> addTimerListener(final TimerListener listener) {
    / / initialize the executor scheduler: ScheduledThreadPoolExecutor
    startThreadIfNeeded();

    Runnable r = new Runnable() {
        @Override
        public void run(a) {
            try {
                // Trigger the listener
                listener.tick();
            } catch (Exception e) {
                logger.error("Failed while ticking TimerListener", e); }}};// The timeout period is 1000 milliseconds by default and is scheduled every 1000 millisecondsScheduledFuture<? > f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(),// How long to delay execution
            listener.getIntervalTimeInMilliseconds(), // Execution cycle
            TimeUnit.MILLISECONDS);
    return new TimerReference(listener, f);
}
Copy the code

Drop the callback

Execution error type

In Hystrix Basics, we mentioned six types of Hystrix execution errors, as shown in the figure below, all of which, except BAD_REQUEST, are relegated to callback methods.

In AbstractCommand, you can see the following six error handling methods, one for each of Hystrix’s six error types. In analyzing the execution flow of HystrixCommand, we know that there are many callback methods, and the error callback will definitely call one of the following methods to handle the error.

It is important to note that in addition to handleBadRequestByEmittingError errors BAD_REQUEST () the processing method, The rest of the five methods finally will be called getFallbackOrThrowException method to get the callback method of observables, such shu corresponding up above. Account is encapsulated getFallbackOrThrowException error correction.

// The semaphore rejects callbacks
private Observable<R> handleSemaphoreRejectionViaFallback(a) {
    Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
    executionResult = executionResult.setExecutionException(semaphoreRejectionException);
    eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
    return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
            		"could not acquire a semaphore for execution", semaphoreRejectionException);
}

// Short circuit callback
private Observable<R> handleShortCircuitViaFallback(a) {
    eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
    Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
    executionResult = executionResult.setExecutionException(shortCircuitException);
    return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                	"short-circuited", shortCircuitException);
}

// The thread pool rejects callbacks
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
    eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
    threadPool.markThreadRejection();
    return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, 
    			"could not be queued for execution", underlying);
}

// Timeout callback
private Observable<R> handleTimeoutViaFallback(a) {
    return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, 
    			"timed-out".new TimeoutException());
}

// Failed callback
private Observable<R> handleFailureViaFallback(Exception underlying) {
    / /...
    eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);
    return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}

/ / BAD_REQUEST processing
private Observable<R> handleBadRequestByEmittingError(Exception underlying) {
    Exception toEmit = underlying;
	/ /...
    return Observable.error(toEmit);
}
Copy the code

Downgrade callback processing

And then to see the callback getFallbackOrThrowException processing method, can get the following information:

  • The semaphore is used to limit the flow during the callback, i.egetFallbackSemaphore()Gets the semaphore. The actual type returned by default isTryableSemaphore, the default value is 10. The callback method is then invoked after obtaining a license from the semaphore.
  • After obtaining the semaphore license, callgetFallbackObservable()Gets the callback Observable if the user reloads itgetFallback()Method returns the Observable of getFallback(); Throws if there is no overload"No fallback available."The exception.
  • The final step is to subscribe to the callback object and enter the degraded callback method.
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    // Enable the callback
    if (properties.fallbackEnabled().get()) {
        // Set the current thread
        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? superR>>() {... };// Callback notification
        final Action1<R> markFallbackEmit = newAction1<R>() {... };// Callback completion notification
        final Action0 markFallbackCompleted = newAction0() {... };// Callback error post-processing
        final Func1<Throwable, Observable<R>> handleFallbackError = newFunc1<Throwable, Observable<R>>() {... };// Callback semaphore => TryableSemaphore
        final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        // Release semaphore license
        final Action0 singleSemaphoreRelease = newAction0() {... };// Get the callback Observable
        Observable<R> fallbackExecutionChain;
        // Get semaphore license
        if (fallbackSemaphore.tryAcquire()) {
            try {
                // Does the user define a callback method that overwrites getFallback()
                if (isFallbackUserDefined()) {
                    executionHook.onFallbackStart(this);
                    fallbackExecutionChain = getFallbackObservable();
                } else {
                    // Throw an exception: "No fallback available."fallbackExecutionChain = getFallbackObservable(); }}catch (Throwable ex) {
                fallbackExecutionChain = Observable.error(ex);
            }

            // Perform the callback
            return fallbackExecutionChain
                    .doOnEach(setRequestContext)
                    .lift(new FallbackHookApplication(_cmd))
                    .lift(newDeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);  }else {
            returnhandleFallbackRejectionByEmittingError(); }}else {
        returnhandleFallbackDisabledByEmittingError(originalException, failureType, message); }}Copy the code

A diagram summarizes the Hystrix thread pool isolation running process

The following diagram summarizes the process of submitting Hystrix tasks to the thread pool for execution.

Working principle of circuit breaker

Initialization of circuit breakers

In the AbstractCommand constructor, HystrixCircuitBreaker is initialized. If circuitBreakerEnabled, From HystrixCircuitBreaker. In the Factory for a circuit breaker, the default implementation class is HystrixCircuitBreakerImpl. If the circuit breaker is not enabled, the default implementation class is NoOpCircuitBreaker, which controls nothing.

this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, 
		this.commandGroup, this.commandKey, this.properties, this.metrics);
Copy the code
private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                        HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                        HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    if (enabled) {
        if (fromConstructor == null) {
            // Get the default circuit breaker from Factory
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            returnfromConstructor; }}else {
        // Disable circuit breakers and return implementation classes that do nothing
        return newNoOpCircuitBreaker(); }}Copy the code

HystrixCircuitBreakerImpl constructor main need HystrixCommandProperties and HystrixCommandMetrics, predictably, HystrixCommandMetrics is the source of HystrixCommandMetrics, which counts the number of hystrix request errors, timeouts, rejects, and successes before deciding whether to open the breaker.

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    this.properties = properties;
    this.metrics = metrics;
}
Copy the code

Working mechanism of circuit breaker

The breaker allows the request to be executed

When we looked at applyHystrixSemantics(_CMD), we saw that applyHystrixSemantics is the core entry to applyHystrix, and applyHystrixSemantics is the core entry to applyHystrix, and applyHystrixSemantics is the core entry to applyHystrix, If the circuit breaker does not allow it, the circuit breaker will reject the degraded method.

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // Whether the breaker allows requests
    if (circuitBreaker.allowRequest()) {
        / /...
    } else {
        // Short circuit => Degraded
        returnhandleShortCircuitViaFallback(); }}Copy the code

Now look at the allowRequest() method:

  • First determine if the breaker is forced on, and if it is, the request is not allowed. That is, we can manually open the circuit breaker in some cases to prevent cascading.
  • It then determines whether the circuit breaker is forcibly closed, and if so, the request is allowed. Which means even if the circuit breaker is on, we can turn it off manually.
  • If the circuit breaker is not manually opened or closed, it is eventually determined that the circuit breaker is not open, or if the circuit breaker is open but permits the release of a request to verify that the called party has recovered, the request is allowed to proceed. Otherwise, the request is rejected.
public boolean allowRequest(a) {
    // The circuit breaker is manually forced on
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    // The circuit breaker is closed manually
    if (properties.circuitBreakerForceClosed().get()) {
        // Perform statistics to determine whether to open the circuit breaker
        isOpen();
        return true;
    }
    // The circuit breaker is not on or running
    return! isOpen() || allowSingleTest(); }Copy the code

Circuit breaker open

The isOpen() method is used to check whether the circuit breaker is on:

  • In the first place to judgecircuitOpenThe value is true. If yes, the circuit breaker is on. That means the state of the circuit breaker is determined bycircuitOpenAtomicBoolean to control.
  • If circuitOpen is false, it indicates that the circuit breaker is not enabledHystrixCommandMetricsTo get a count of statisticsHealthCountsThe logic behind this is to calculate whether to open the circuit breaker based on HealthCounts.
  • ifTotal requests < breaker request threshold (default 20)I wouldn’t have opened the circuit breaker.
  • ifError ratio < breaker error ratio threshold (default 50%)I wouldn’t have opened the circuit breaker.
  • Otherwise, the circuit breaker will be turned on. The circuitOpen state is set to True and the circuit breaker opening time is set.

Hystrix circuit breaker design makes good use of the characteristics of atomic class. The update of circuitOpen uses compareAndSet based on CAS instruction, which can be updated safely even in high concurrency. The circuit breaker opening time is set after the update succeeds, and the circuit breaker will not be updated if the update fails.

private AtomicBoolean circuitOpen = new AtomicBoolean(false);

public boolean isOpen(a) {
    CircuitOpen Specifies whether to enable the circuit
    if (circuitOpen.get()) {
        return true;
    }

    // Get the request count
    HealthCounts health = metrics.getHealthCounts();

    // The value is smaller than the request threshold of the circuit breaker. The default value is 20. The circuit breaker is enabled only when the number of requests exceeds 20
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        return false;
    }

    // Error percentage
    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
    	// Open the circuit breaker
        if (circuitOpen.compareAndSet(false.true)) {
        	// Set the breaker open time
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            return true; }}}Copy the code

Circuit breaker half open

If the breaker is already open, it will sleep for some time and then send a request to test whether the called party has recovered. This is the half-open state.

AllowSingleTest (); allowSingleTest();

  • Check whether the current time is longer than the circuit breaker on time + the circuit breaker sleep time. The default sleep time is 5000 ms. If yes, the circuit breaker on time is changed to the current time.
  • That is, every 5,000 milliseconds, a request is sent for testing, and if the request continues to fail, the breaker remains open, which is called half-open.OPEN -> HALF-OPEN.
public boolean allowSingleTest(a) {
    // Circuit breaker opening time
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    // Circuit breaker open and current time > (Circuit breaker open time + circuit breaker sleep window time (default 5000 ms))
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        // Switch on time is set to the current time
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            return true; }}return false;
Copy the code

Circuit breaker restoration

When does the circuit breaker go back on? HystrixCircuitBreaker has a method called markSuccess() that closes the circuit breaker when it opens.

public void markSuccess(a) {
    if (circuitOpen.get()) {
    	// Turn off the circuit breaker
        if (circuitOpen.compareAndSet(true.false)) { metrics.resetStream(); }}}Copy the code

In the executeCommandAndObserve(_cmd) method, you can see that there are two callback functions, markEmits and markOnCompleted, which will be executed on success and completion of the request. The callback function calls in the circuitBreaker. MarkSuccess (); To tell the breaker that the request was successful, and the breaker goes into the closed state, HALF -> CLOSE.

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    // Mark that the command has been executed
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (commandIsScalar()) {
                / /...
                // Tell the breaker that the request was successfulcircuitBreaker.markSuccess(); }}};// Indicates that the command execution is complete
    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call(a) {
            if(! commandIsScalar()) {/ /...
                // Tell the breaker that the request was successfulcircuitBreaker.markSuccess(); }}};// Handle the callback
    final Func1<Throwable, Observable<R>> handleFallback = newFunc1<Throwable, Observable<R>>() {... };// Set the current thread
    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? superR>>() {... }; Observable<R> execution;if (properties.executionTimeoutEnabled().get()) {
        / / overtime will be handled by HystrixObservableTimeoutOperator, throw HystrixTimeoutException timeout exception
        execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    // Set the subscription callback
    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}
Copy the code

A diagram summarizes the working mechanism of a circuit breaker

Finally, a diagram is used to summarize the working mechanism of the circuit breaker.