This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.

background

Currently, for non-core operations, such as storing operation logs after inventory increase or decrease and sending asynchronous messages (specific business processes), if MQ service exceptions occur, the interface response times out. Therefore, it can be considered to introduce service degradation and service isolation for non-core operations.

Hystrix instructions

The official documentation

Hystrix is Netflix’s open source disaster recovery framework that solves the problem of bringing down business systems and even causing avalanches when external dependencies fail.

Why do we need Hystrix?

  • In large and medium-sized distributed systems, the system usually has many dependencies (HTTP, Hession,Netty,Dubbo, etc.). Under high concurrent access, the stability of these dependencies has a great impact on the system. However, the dependencies have many uncontrollable problems, such as slow network connection, busy resources, temporarily unavailable, offline services, etc.

  • When a dependency blocks, most servers’ thread pools BLOCK, affecting the stability of the entire online service. Applications with complex distributed architectures that have many dependencies will inevitably fail at some point. High concurrency dependencies fail without isolation, and the current application service is at risk of being dragged down.

For example, a system that relies on 30 SOA services, each 99.99% available. 0.3% means 3,000,00 failures for 100 million requests, which translates into approximately 2 hours of service instability per month. As the number of service dependencies increases, the probability of service instability increases exponentially. Solution: Isolate dependencies.Copy the code

Hystrix design philosophy

To understand how to use Hystrix, you must first understand its core design concept. Hystrix is based on the command pattern, which is visually understood through UML diagrams.

  • As you can see, Command is an intermediate layer added between Receiver and Invoker. Command encapsulates the Receiver.

  • Apis can be Invoker or Reciever, and encapsulate these apis by inheriting from the Hystrix core class HystrixCommand (for example, remote interface calls, database queries, and the like that can cause delays).

  • You can provide elastic protection for your API.

How does Hystrix address dependency isolation

  1. Hystrix uses the Command pattern HystrixCommand(Command) to wrap the dependent call logic, with each Command executed in a separate thread/under signal authorization.

  2. You can configure the dependent call timeout period. The timeout period is generally set to slightly higher than 99.5% average time. When the call times out, the fallback logic is returned or executed directly.

  3. Provide a small thread pool (or signal) for each dependency. If the thread pool is full, the call will be rejected immediately, with no queuing by default, speeding up the failure determination time.

  4. Dependent call result points, success, failure (throw exception), timeout, thread rejection, short circuit. Fallback logic is executed when the request fails (exception, rejection, timeout, short circuit).

  5. Provides fuse components that can be run automatically or manually called to stop the current dependence for a period of time (10 seconds). The fuse default error rate threshold is 50%, beyond which it will run automatically.

  6. Provides statistics and monitoring for near real-time dependency

Hystrix process structure analysis

,

Process description:

  1. Each call builds a HystrixCommand or HystrixObservableCommand object, encapsulating the dependent calls in the Run () method.

  2. If there’s no execute()/queue doing sync or async call, then the real run()/construct()

  3. Check whether the circuit-breaker is on. If it is, go to Step 8 for downscaling; if it is off, enter the step.

  4. Check whether the thread pool/queue/semaphore is full. If so, go to downgrade step 8; otherwise, continue with the following steps.

  5. Using HystrixObservableCommand. The construct () or HystrixCommand. The run (), rely on logic operation

  6. The dependent logical call timed out, and go to Step 8

  7. Determines whether the logic was successfully invoked

    • 6A returns the result of a successful call

    • 6b call error, go to Step 8.

  8. Calculate the status of fuses, and report all operating status (success, failure, rejection, timeout) to fuses for statistics to determine the status of fuses.

  9. A. A Command that does not implement getFallback will throw an exception directly

    B. Fallback Returns after the degraded logic invocation succeeds

    C. The degraded logical invocation fails and an exception is thrown

  10. The result is displayed

The getFallback call is triggered in four ways:

  1. The run () method throws the HystrixBadRequestException anomalies.

  2. The run() method call timed out

  3. Fuse open short circuit call

  4. Whether the thread pool/queue/semaphore is full

A Circuit Breaker

By default, each fuse maintains 10 buckets, one bucket per second. Each bucket records the status of success, failure, timeout, and rejection. By default, errors exceed 50% and more than 20 requests are interrupted within 10 seconds.

Hystrix isolation analysis

Hystrix isolation uses thread/signal isolation to limit the concurrency and blocking spread of dependencies.

Thread isolation

  • The thread executing the dependent code is separated from the requesting thread (for example, the Jetty thread), and the requesting thread is free to control when it leaves (an asynchronous process).

  • The amount of concurrency can be controlled by the size of the thread pool. When the thread pool is saturated, the service can be denied in advance to prevent the proliferation of dependency problems.

  • It is recommended not to set the thread pool too large, otherwise a large number of blocked threads may slow down the server.

Actual cases:

Netflix internally believes that thread isolation overhead is small enough to not have a significant cost or performance impact. Netflix’s internal API relies on 10 billion HystrixCommand requests per day using thread isolation, with approximately 40 + thread pools per application and approximately 5-20 threads per thread pool.

Signal isolation

Signal isolation can also be used to limit the concurrent access to prevent blocking diffusion, and the maximum thread isolation is implementation dependent code is a different thread is still requesting thread (the thread through the signal applications), if the client is reliable and can quickly return to, you can use the signal isolation replace thread isolation, reduce the cost.

The size of the semaphore can be adjusted dynamically, but the thread pool size cannot.

The difference between thread isolation and signal isolation is shown below:

Fallback failover degradation mechanism

Interested partners can see: official reference documents

Source code analysis

Hystrix – core – 1.5.12 – sources jar! /com/netflix/hystrix/AbstractCommand.java

executeCommandAndObserve
    /**
     * This decorates "Hystrix" functionality around the run() Observable.
     * @return R
     */
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        / /...
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable,
			Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /* * Treat HystrixBadRequestException from ExecutionHook like a plain * HystrixBadRequestException. */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
                    returnhandleFailureViaFallback(e); }}};/ /...
        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }
        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
Copy the code

Observable onErrorResumeNext calls handleFallback in Observable onErrorResumeNext.

  • RejectedExecutionException call handleThreadPoolRejectionViaFallback

  • HystrixTimeoutException call handleTimeoutViaFallback

  • Non HystrixBadRequestException call handleFailureViaFallback

applyHystrixSemantics
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback. No state is left inconsistent
        executionHook.onStart(_cmd);
        /* determine if we're allowed to execute */
        if (circuitBreaker.attemptExecution()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call(a) {
                    if (semaphoreHasBeenReleased.compareAndSet(false.true)) { executionSemaphore.release(); }}};final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); }};if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    returnObservable.error(e); }}else {
                returnhandleSemaphoreRejectionViaFallback(); }}else {
            returnhandleShortCircuitViaFallback(); }}Copy the code
  • ApplyHystrixSemantics method for executionSemaphore. TryAcquire () didn’t pass the call

  • handleSemaphoreRejectionViaFallback

  • ApplyHystrixSemantics method for circuitBreaker. AttemptExecution () didn’t pass the call handleShortCircuitViaFallback ()

ViaFallback method
    private Observable<R> handleSemaphoreRejectionViaFallback(a) {
        Exception semaphoreRejectionException = new RuntimeException("could not acquire a semaphore for execution");
        executionResult = executionResult.setExecutionException(semaphoreRejectionException);
        eventNotifier.markEvent(HystrixEventType.SEMAPHORE_REJECTED, commandKey);
        logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
        // retrieve a fallback or throw an exception if no fallback available
        return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
                "could not acquire a semaphore for execution", semaphoreRejectionException);
    }

    private Observable<R> handleShortCircuitViaFallback(a) {
        // record that we are returning a short-circuited fallback
        eventNotifier.markEvent(HystrixEventType.SHORT_CIRCUITED, commandKey);
        // short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
        Exception shortCircuitException = new RuntimeException("Hystrix circuit short-circuited and is OPEN");
        executionResult = executionResult.setExecutionException(shortCircuitException);
        try {
            return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
                    "short-circuited", shortCircuitException);
        } catch (Exception e) {
            returnObservable.error(e); }}private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
        eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
        threadPool.markThreadRejection();
        // use a fallback instead (or throw exception if not implemented)
        return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);
    }

    private Observable<R> handleTimeoutViaFallback(a) {
        return getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out".new TimeoutException());
    }

    private Observable<R> handleFailureViaFallback(Exception underlying) {
        /** * All other error handling */
        logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);

        // report failure
        eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

        // record the exception
        executionResult = executionResult.setException(underlying);
        return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
    }
Copy the code
  • HandleSemaphoreRejectionViaFallback, handleShortCircuitViaFallback, handleThreadPoolRejectionViaFallback, handleTimeoutViaF Allback, handleFailureViaFallback getFallbackOrThrowException these method calls
  • The eventtypes are SEMAPHORE_REJECTED, SHORT_CIRCUITED, THREAD_POOL_REJECTED, TIMEOUT, and FAILURE
  • AbstractCommand.getFallbackOrThrowException

Hystrix – core – 1.5.12 – sources jar! /com/netflix/hystrix/AbstractCommand.java


    /** * Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions. *  <p> * Fallback implementations shouldn't perform anything that can be blocking, but we protect against it anyways in case someone doesn't abide by the contract. * <p> * If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially * all threads to pile up and block. * *@return K
     * @throws UnsupportedOperationException
     *             if getFallback() not implemented
     * @throws HystrixRuntimeException
     *             if getFallback() fails (throws an Exception) or is rejected by the semaphore
     */
    private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
        final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
        // record the executionResult
        // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
        executionResult = executionResult.addEvent((int) latency, eventType);

        if (isUnrecoverable(originalException)) {
            logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);

            /* executionHook for all errors */
            Exception e = wrapWithOnErrorHook(failureType, originalException);
            return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + "" + message + " and encountered unrecoverable error.", e, null));
        } else {
            if (isRecoverableError(originalException)) {
                logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
            }

            if (properties.fallbackEnabled().get()) {
                /* fallback behavior is permitted so attempt */

                final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                    @Override
                    public void call(Notification<? super R> rNotification) { setRequestContextIfNeeded(requestContext); }};final Action1<R> markFallbackEmit = new Action1<R>() {
                    @Override
                    public void call(R r) {
                        if(shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT); eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey); }}};final Action0 markFallbackCompleted = new Action0() {
                    @Override
                    public void call(a) {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS); }};final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                    @Override
                    public Observable<R> call(Throwable t) {
                        /* executionHook for all errors */
                        Exception e = wrapWithOnErrorHook(failureType, originalException);
                        Exception fe = getExceptionFromThrowable(t);

                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        Exception toEmit;

                        if (fe instanceof UnsupportedOperationException) {
                            logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message + " and no fallback available.", e, fe);
                        } else {
                            logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                            eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                            executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

                            toEmit = new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + "" + message + " and fallback failed.", e, fe);
                        }

                        // NOTE: we're suppressing fallback exception here
                        if (shouldNotBeWrapped(originalException)) {
                            return Observable.error(e);
                        }

                        returnObservable.error(toEmit); }};final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {
                    @Override
                    public void call(a) {
                        if (semaphoreHasBeenReleased.compareAndSet(false.true)) { fallbackSemaphore.release(); }}}; Observable<R> fallbackExecutionChain;// acquire a permit
                if (fallbackSemaphore.tryAcquire()) {
                    try {
                        if (isFallbackUserDefined()) {
                            executionHook.onFallbackStart(this);
                            fallbackExecutionChain = getFallbackObservable();
                        } else {
                            //same logic as above without the hook invocationfallbackExecutionChain = getFallbackObservable(); }}catch (Throwable ex) {
                        //If hook or user-fallback throws, then use that as the result of the fallback lookup
                        fallbackExecutionChain = Observable.error(ex);
                    }

                    return fallbackExecutionChain
                            .doOnEach(setRequestContext)
                            .lift(new FallbackHookApplication(_cmd))
                            .lift(newDeprecatedOnFallbackHookApplication(_cmd)) .doOnNext(markFallbackEmit) .doOnCompleted(markFallbackCompleted) .onErrorResumeNext(handleFallbackError) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);  }else {
                   returnhandleFallbackRejectionByEmittingError(); }}else {
                returnhandleFallbackDisabledByEmittingError(originalException, failureType, message); }}}Copy the code
  • FallbackExecutionChain onErrorResumeNext, called handleFallbackError
  • DoOnCompleted of fallbackExecutionChain, which calls markFallbackCompleted
  • AbstractCommand.getFallbackSemaphore

Hystrix – core – 1.5.12 – sources jar! /com/netflix/hystrix/AbstractCommand.java

    /**
     * Get the TryableSemaphore this HystrixCommand should use if a fallback occurs.
     * 
     * @return TryableSemaphore
     */
    protected TryableSemaphore getFallbackSemaphore(a) {
        if (fallbackSemaphoreOverride == null) {
            TryableSemaphore _s = fallbackSemaphorePerCircuit.get(commandKey.name());
            if (_s == null) {
                // we didn't find one cache so setup
                fallbackSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.fallbackIsolationSemaphoreMaxConcurrentRequests()));
                // assign whatever got set (this or another thread)
                return fallbackSemaphorePerCircuit.get(commandKey.name());
            } else {
                return_s; }}else {
            returnfallbackSemaphoreOverride; }}Copy the code

Get or create TryableSemaphoreActual for each commandKey

Fallback source analysis summary

Hystrix’s fallbacks fall into five main types:

  • The corresponding handleSemaphoreRejectionViaFallback SEMAPHORE_REJECTED
  • The corresponding handleShortCircuitViaFallback SHORT_CIRCUITED
  • The corresponding handleThreadPoolRejectionViaFallback THREAD_POOL_REJECTED
  • The TIMEOUT corresponding handleTimeoutViaFallback
  • Pool_raise_if_allocation_failure is corresponding handleFailureViaFallback
  • This a few methods ended up call getFallbackOrThrowException method.