1. Hystrix profile

In a distributed environment, some of the many service dependencies will inevitably fail. Hystrix is a library that helps you control the interaction between these distributed services by adding delay tolerance and fault tolerance logic. Hystrix does this by isolating access points between services, stopping cascading failures between them, and providing callback options, all of which improve the overall resiliency of the system.

2. Hystrix target

Hystrix is designed to do the following

  • Protects and controls delays and failures of library access dependencies through third-party customers, usually over a network.
  • Stop cascading failures in complex distributed systems
  • Quick failures and quick recoveries.
  • Step back and downgrade gracefully when possible.
  • Enables near real-time monitoring, alerts, and operational control.

3. What problem does Hystrix solve

In a complex distributed architecture, an application has dozens of dependencies, each of which will inevitably fail at some point. If the host application is not isolated from these external failures, it risks being shut down.

For example, for an application that relies on 30 services, each with 99.99% uptime, you can expect the following:

Even if all of your dependencies worked well, you could expect to be out of use for at least two hours per month.Copy the code

The reality is usually worse.

Even if all dependencies work well, if the entire system is not designed to be resilient, the total impact of even a 0.01% outage on dozens of services is equivalent to the potential number of hours of downtime per month.

When one of many back-end systems becomes a potential system, it can block the entire user request. As traffic increases, a potential back-end dependency causes all resources on all servers to saturate within seconds.

Every point in the application that passes over the network or enters the client repository, which may result in a network request, is a source of potential failure. Worse than failures, these applications can cause increased latency between services, leading to queuing, threads, and other system resources, and even more cascading failures in the system.

These problems are exacerbated when network access is performed through third-party clients. A third-party client is a black box in which implementation details are hidden and subject to change, with different network or resource configurations for each client library and often difficult to monitor and change.

Even worse are pass-through dependencies, which perform potentially expensive or error-prone network calls that the application does not explicitly invoke.

The network connection fails or is degraded. Services and servers are down or slow. New library or service deployments can change behavior or performance characteristics. The client library is defective.

All of these representations of failures and delays need to be isolated and managed so that a single dependency failure does not cause the entire application or system to crash.

4. Design principles

  • Prevent any single dependency from depleting all container (such as Tomcat) user threads.
  • Reduce load and fail quickly instead of queuing.
  • Provide callbacks wherever feasible to protect users from failure.
  • Use isolation techniques to limit the impact of any one dependency.
  • Optimize discovery times with near-real-time measurement, monitoring, and alerts
  • In most aspects of Hystrix, recovery time is optimized by low-latency propagation of configuration changes and support for dynamic property changes, which allows you to make real-time operational changes using low-latency feedback loops.
  • Prevent the dependency client from failing throughout execution, not just during network traffic.

5. How does Hystrix achieve its goals

Hystrix does this

  • Wrap all calls to external systems (or dependencies) in HystrixCommand or HystrixObservableCommand objects, usually executed in a separate thread
  • Timeout calls that take longer than the threshold you defined. There is a default value, but for most dependencies, you can customize these timeouts through the property to make them slightly higher than a 99.5 percentage of performance per dependency.
  • Maintain a small thread pool (or semaphore) for each dependency. If the dependency is full, requests for the dependency are rejected immediately, rather than queued.
  • Measure the number of successes, failures (exceptions thrown by the client), timeouts, and thread rejections.
  • Fuse to manually or automatically stop all requests to a particular service over a period of time if the percentage of errors for that service exceeds a threshold.
  • Rollback logic is performed when a request fails, is rejected, times out, or is short-circuited
  • Monitor metrics and configuration changes in near real time.

When you use Hystrix to wrap each of the underlying dependencies, the architecture shown in the figure above changes as shown in the figure below. Each dependency is isolated, limits resource saturation when delays occur, and is overridden in fallback logic that determines what should be done in response to any type of failure in a dependency

6. Hystrix-go source code analysis

Hystrix is a Netflix open-source JAVA project, and Hystrix-Go is an implementation of Golang

Execute code as Hystrix command

Define the application logic that depends on the external system and pass the function to Hystrix.go. This is the only thing executed when the system is fine.

hystrix.Go("my_command", func() error {
	// talk to other services
	return nil
}, nil)
Copy the code

The prototype of the Go function is to run the function you specify to track the health of the function. If the function starts to slow down or fails multiple times, we block new calls to give the service time to repair. If you need to execute some additional code during an interrupt, you can specify a custom callback function.

func Go(name string, run runFunc, fallback fallbackFunc) chan error { runC := func(ctx context.Context) error { return run() } var fallbackC fallbackFuncC  if fallback ! = nil { fallbackC = func(ctx context.Context, err error) error { return fallback(err) } } return GoC(context.Background(), name, runC, fallbackC) }Copy the code

The above function is used to execute functions asynchronously. If synchronous execution is required, call the Do method directly. Do runs the function you specify in synchronous mode, blocking until the function succeeds, or returns an error, including hystrix’s circuit breaker error.

func Do(name string, run runFunc, fallback fallbackFunc) error { runC := func(ctx context.Context) error { return run() } var fallbackC fallbackFuncC if fallback ! = nil { fallbackC = func(ctx context.Context, err error) error { return fallback(err) } } return DoC(context.Background(), name, runC, fallbackC) }Copy the code

To make it easier to understand, some Do methods call Doc, which is the Cotext version of the Do function.

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error { done := make(chan struct{}, 1) r := func(ctx context.Context) error { err := run(ctx) if err ! = nil { return err } done <- struct{}{} return nil } f := func(ctx context.Context, e error) error { err := fallback(ctx, e) if err ! = nil { return err } done <- struct{}{} return nil } var errChan chan error if fallback == nil { errChan = GoC(ctx, name, r, nil) } else { errChan = GoC(ctx, name, r, f) } select { case <-done: return nil case err := <-errChan: return err } }Copy the code

This function is divided into several parts, redefining the specified run function redefining the callback function the above logic is mainly added to write done chan after a successful call to facilitate DoC to check whether the execution is complete

The next thing to notice is the call to the GoC function, which is the main call logic for the Go function. So that’s the most important thing

The last part of the DoC checks whether done chan or errChan has data, depending on whether the specified function logic executed successfully or if an error occurred.

DoC is that simple, and it fits our way of thinking very well.

So the important thing is, GoC is Go and it adds the Go function to the Context, and we need to look at the logic in detail

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error { cmd := &command{ run: run, fallback: fallback, start: time.Now(), errChan: make(chan error, 1), finished: make(chan bool, 1), } // dont have methods with explicit params and returns // let data come in and out naturally, like with any closure // explicit error return to give place for us to kill switch the operation (fallback) circuit, _, err := GetCircuit(name) if err ! = nil { cmd.errChan <- err return cmd.errChan } cmd.circuit = circuit ticketCond := sync.NewCond(cmd) ticketChecked := false // When the caller extracts error from returned errChan, it's assumed that // the ticket's been returned to executorPool. Therefore, returnTicket() can // not run after cmd.errorWithFallback(). returnTicket := func() { cmd.Lock() // Avoid releasing before a ticket is acquired. for ! ticketChecked { ticketCond.Wait() } cmd.circuit.executorPool.Return(cmd.ticket) cmd.Unlock() } // Shared by the following two goroutines. It ensures only the faster // goroutine runs errWithFallback() and reportAllEvent(). returnOnce := &sync.Once{} reportAllEvent := func() { err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) if err ! = nil { log.Printf(err.Error()) } } go func() { defer func() { cmd.finished <- true }() // Circuits get opened when recent executions have shown to have a high error rate. // Rejecting new executions allows backends to recover, and the circuit will allow // new traffic when it feels a healthly state has returned. if ! cmd.circuit.AllowRequest() { cmd.Lock() // It's safe for another goroutine to go ahead releasing a nil ticket. ticketChecked = true ticketCond.Signal() cmd.Unlock() returnOnce.Do(func() { returnTicket() cmd.errorWithFallback(ctx, ErrCircuitOpen) reportAllEvent() }) return } // As backends falter, requests take longer but don't always fail. // // When requests slow down but the incoming rate of requests stays the same, you have to // run more at a time to keep up. By controlling concurrency during these situations, you can // shed load which accumulates due to the increasing ratio of active commands to incoming requests. cmd.Lock() select { case cmd.ticket = <-circuit.executorPool.Tickets: ticketChecked = true ticketCond.Signal() cmd.Unlock() default: ticketChecked = true ticketCond.Signal() cmd.Unlock() returnOnce.Do(func() { returnTicket() cmd.errorWithFallback(ctx, ErrMaxConcurrency) reportAllEvent() }) return } runStart := time.Now() runErr := run(ctx) returnOnce.Do(func() { defer reportAllEvent() cmd.runDuration = time.Since(runStart) returnTicket() if runErr ! = nil { cmd.errorWithFallback(ctx, runErr) return } cmd.reportEvent("success") }) }() go func() { timer := time.NewTimer(getSettings(name).Timeout) defer timer.Stop() select { case <-cmd.finished: // returnOnce has been executed in another goroutine case <-ctx.Done(): returnOnce.Do(func() { returnTicket() cmd.errorWithFallback(ctx, ctx.Err()) reportAllEvent() }) return case <-timer.C: returnOnce.Do(func() { returnTicket() cmd.errorWithFallback(ctx, ErrTimeout) reportAllEvent() }) return } }() return cmd.errChan }Copy the code

The GoC function first instantiates CMD, which is used by the run on the fuse. Often used to describe the run and fallback functions on fuses.

ErrChan Is used to record function or fuse errors. It is a chan finished with a cache to indicate whether the chan is finished

Then GetCircuit gets the circuit from circuitBreakers based on the parameter name. If you can’t get a newCircuitBreaker, create one and put it in The circuitBreakers. Because this operation needs to be thread-safe, a two-lock mechanism is added

If an error is reported, the error is written to errChan, then return

The circuit property assigned to CMD after circuit is obtained.

Next, a Cond is created from CMD to returnTicket. In the temporary variable of the returnTicket function,

returnTicket := func() { cmd.Lock() // Avoid releasing before a ticket is acquired. for ! ticketChecked { ticketCond.Wait() } cmd.circuit.executorPool.Return(cmd.ticket) cmd.Unlock() }Copy the code

This is also a Cond best practice in Sync: lock, detect conditions, perform logic, unlock.

Then create a sync.once for the next two Goroutines. Ensure that only the fastest goroutine runs the errWithFallback() and reportAllEvent functions

The reportAllEvent function is used to report events.

The next two Goroutines are used to detect various states and perform actions in those states.

At the end of the first Goroutine, defer writes to the Finished pipe.

It then determines whether the fuse is on, and if the latest execution has a high error rate, it rejects new requests and allows the back end to resume until it feels more normal

If access is not currently allowed, ticketChecked=true and Cond Singal() performs returnOnce. Then return directly. This time the goroutine ends.

If the current environment is generous and running traffic flows in, you can either obtain ticket from the executorPool, or perform the logic in default in select because you cannot obtain ticket, which is similar to the fuses shown above. Except the error message in errWithFallback is ErrMaxConcurrency. Identification has too much parallel logic execution.

If I get ticket. Now we can run our run function. We can calculate the runDuration of the run function in the returnOnce, depending on the start and end time

According to the return value of the run function, success or error information is reported.

That’s all the logic of the first Goroutine. After everything is done, write cmd.finished to indicate that the logic of the fuse wrap is complete.

The following goroutine is mainly used to determine context controls and timeout controls, as well as to end directly according to cmd.finished

In this Goroutine, we get the Timeout Timeout setting based on our setting to define a timer

Select which case succeeds to execute the logic. All branches except cmD. finished need to be reported.

That’s all the logic in the GoC.

We found that in each reporting logic, the ticket was returned first, followed by the errorWithFallback and reportAllEvent according to the different states

func (c *command) errorWithFallback(ctx context.Context, err error) { eventType := "failure" if err == ErrCircuitOpen { eventType = "short-circuit" } else if err == ErrMaxConcurrency { eventType = "rejected" } else if err == ErrTimeout { eventType = "timeout" } else if err == context.Canceled { eventType = "context_canceled" } else if err == context.DeadlineExceeded { eventType = "context_deadline_exceeded" } c.reportEvent(eventType) fallbackErr := c.tryFallback(ctx, err) if fallbackErr ! = nil { c.errChan <- fallbackErr } }Copy the code

ErrorWithFallback determines the time type based on our err parameter

func (c *command) tryFallback(ctx context.Context, err error) error { if c.fallback == nil { // If we don't have a fallback return the original error. return err } fallbackErr := c.fallback(ctx, err) if fallbackErr ! = nil { c.reportEvent("fallback-failure") return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err) } c.reportEvent("fallback-success") return nil }Copy the code

If c.fallback is nil, err is directly returned. If fallback is not empty, fallback is called to check whether the returned value is nil. If it is not empty, err is reported. If the fallback is empty, fallback succeeds

func (c *command) reportEvent(eventType string) {
	c.Lock()
	defer c.Unlock()

	c.events = append(c.events, eventType)
}

Copy the code

To report an event, add a string identifying the event type to the Events list in the CMD

// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard. func (circuit  *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error { if len(eventTypes) == 0 { return fmt.Errorf("no event types sent for metrics") } circuit.mutex.RLock() o := circuit.open circuit.mutex.RUnlock() if eventTypes[0] == "success" && o { circuit.setClose() } var concurrencyInUse float64 if circuit.executorPool.Max > 0 { concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max) } select { case circuit.metrics.Updates <- &commandExecution{ Types: eventTypes, Start: start, RunDuration: runDuration, ConcurrencyInUse: concurrencyInUse, }: default: return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)} } return nil }Copy the code

When reporting events, the reportAllEvent function posts cmd.events as part of a commanExecution to Circuit.metrics.Updates otherwise reports metrics chann full.

This is not the end of the story, so let’s insert some of the configuration before we move on

In general, the hystrix.configureCommand () method can be called to adjust the Settings for each command at application startup time.

hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
	Timeout:               1000,
	MaxConcurrentRequests: 100,
	ErrorPercentThreshold: 25,
})
Copy the code

Hystrix.configure () can also be used to accept a map[string]CommandConfig

Let’s start by looking at the init() method in settings.go

var circuitSettings map[string]*Settings
var settingsMutex *sync.RWMutex
var log logger

func init() {
	circuitSettings = make(map[string]*Settings)
	settingsMutex = &sync.RWMutex{}
	log = DefaultLogger
}

Copy the code

Initialization of global variables. CircuitSettings is the setting of the fuse. The key of the field is the name of each CMD, and the value is Settings, as follows:

type Settings struct {
	Timeout                time.Duration
	MaxConcurrentRequests  int
	RequestVolumeThreshold uint64
	SleepWindow            time.Duration
	ErrorPercentThreshold  int
}
Copy the code

The above configuration items have default values

var ( // DefaultTimeout is how long to wait for command to complete, in milliseconds DefaultTimeout = 1000 // DefaultMaxConcurrent is how many commands of the same type can run at the same time DefaultMaxConcurrent = 10 // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health DefaultVolumeThreshold = 20 // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery DefaultSleepWindow = 5000 // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests DefaultErrorPercentThreshold  = 50 // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing. DefaultLogger = NoopLogger{} )Copy the code
  • Timeout: indicates the Timeout period for executing CMD. The default value is 1000
  • MaxConcurrentRequests: specifies the maximum number of concurrent requests. The default value is 10
  • RequestVolumeThreshold: Minimum number of requests for fuses to be opened due to a health condition. The default value is 20
  • SleepWindow: how long to wait for the fuse to recover after it is turned on, in milliseconds. The default value is 5s
  • ErrorPercentThreshold: Once the number of error measures exceeds the specified percentage, the fuse will be turned on. The default is 50%

If the preceding configuration items are not configured, the default configuration items are used.

After configuration, let’s take a look at how our reported information is used. As we said above, the reported information is put into circuit. Metrics.Updates. So who’s going to handle it?

To understand this, you need to know what metrics do. First each circuit is created for each ExecutorPool to track whether requests should be allowed, or rejected, if the fuse health is low

type CircuitBreaker struct {
	Name                   string
	open                   bool
	forceOpen              bool
	mutex                  *sync.RWMutex
	openedOrLastTestedTime int64

	executorPool *executorPool
	metrics      *metricExchange
}
Copy the code

Cmd.allowrequst () is used to determine whether to allow new requests

// AllowRequest is checked before a command executes, ensuring that circuit state and metric health allow it. // When the circuit is open, this call will occasionally return true to measure whether the external service // has recovered. func (circuit *CircuitBreaker) AllowRequest() bool { return ! circuit.IsOpen() || circuit.allowSingleTest() } func (circuit *CircuitBreaker) allowSingleTest() bool { circuit.mutex.RLock() defer circuit.mutex.RUnlock() now := time.Now().UnixNano() openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime) if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() { swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) if swapped { log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name) } return swapped } return false }Copy the code

As you can see, it will not only judge Isopen(), but also allowSingleTest, which will test whether the fuse needs to be closed by releasing a portion of the test requests according to the time window when the fuse Isopen

The Metrics property is a metricExchange structure

type metricExchange struct {
	Name    string
	Updates chan *commandExecution
	Mutex   *sync.RWMutex

	metricCollectors []metricCollector.MetricCollector
}
Copy the code

Updates are used to record reported data. MetricCollectors is a list of metricCollectors. A MetricCollector is an interface type that represents the contracts that all collectors must comply with in order to collect fusing circuit statistics. The implementation of this interface does not have to maintain locks around its data store as long as it is not modified outside of the Hystrix Context context. The core method is

type MetricCollector interface {
	Update(MetricResult)
	Reset()
}

Copy the code

Update takes a set of metrics Reset from a command execution to restart internal counters and timers.

Let’s first look at how we instantiate this structure, okay

func newMetricExchange(name string) *metricExchange { m := &metricExchange{} m.Name = name m.Updates = make(chan *commandExecution, 2000) m.Mutex = &sync.RWMutex{} m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name) m.Reset()  go m.Monitor() return m }Copy the code

In the initialization m.m etricCollectors properties, call the Registry Registry is a default metricCollectorRegistry InitializeMetricCollectors method, Fuse is used to collect data about the health of the fuse and InitializeMetricCollectors method run registered MetricCollector initialization, create a MetricCollectors array

var Registry = metricCollectorRegistry{
	lock: &sync.RWMutex{},
	registry: []func(name string) MetricCollector{
		newDefaultMetricCollector,
	},
}

type metricCollectorRegistry struct {
	lock     *sync.RWMutex
	registry []func(name string) MetricCollector
}

func (m *metricCollectorRegistry) InitializeMetricCollectors(name string) []MetricCollector {
	m.lock.RLock()
	defer m.lock.RUnlock()

	metrics := make([]MetricCollector, len(m.registry))
	for i, metricCollectorInitializer := range m.registry {
		metrics[i] = metricCollectorInitializer(name)
	}
	return metrics
}
Copy the code

See newDefaultMetricCollector, this is our default collector

func newDefaultMetricCollector(name string) MetricCollector { m := &DefaultMetricCollector{} m.mutex = &sync.RWMutex{} m.Reset() return m } type DefaultMetricCollector struct { mutex *sync.RWMutex numRequests *rolling.Number errors *rolling.Number successes *rolling.Number failures *rolling.Number rejects *rolling.Number shortCircuits *rolling.Number  timeouts *rolling.Number contextCanceled *rolling.Number contextDeadlineExceeded *rolling.Number fallbackSuccesses *rolling.Number fallbackFailures *rolling.Number totalDuration *rolling.Timing runDuration *rolling.Timing }Copy the code

The default collector for DefaultMetricCollector holds the status information for fuses, and the type of each item is a Rolling.Number


type Number struct {
	Buckets map[int64]*numberBucket
	Mutex   *sync.RWMutex
}

type numberBucket struct {
	Value float64
}

Copy the code

Number Tracks numberBucket within a certain period of time. The current time segment is one second long and only the last 10 seconds are reserved. That is, each numberBucket records the current number of seconds and Buckets’ key is the current number of seconds

At the end, a monitor is turned on,

func (m *metricExchange) Monitor() {
	for update := range m.Updates {
		// we only grab a read lock to make sure Reset() isn't changing the numbers.
		m.Mutex.RLock()

		totalDuration := time.Since(update.Start)
		wg := &sync.WaitGroup{}
		for _, collector := range m.metricCollectors {
			wg.Add(1)
			go m.IncrementMetrics(wg, collector, update, totalDuration)
		}
		wg.Wait()

		m.Mutex.RUnlock()
	}
}

Copy the code

In monitoring, the collector list is iterated over and IncrementMetrics is asynchronously executed to update the associated data item store


func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
	// granular metrics
	r := metricCollector.MetricResult{
		Attempts:         1,
		TotalDuration:    totalDuration,
		RunDuration:      update.RunDuration,
		ConcurrencyInUse: update.ConcurrencyInUse,
	}

	switch update.Types[0] {
	case "success":
		r.Successes = 1
	case "failure":
		r.Failures = 1
		r.Errors = 1
	case "rejected":
		r.Rejects = 1
		r.Errors = 1
	case "short-circuit":
		r.ShortCircuits = 1
		r.Errors = 1
	case "timeout":
		r.Timeouts = 1
		r.Errors = 1
	case "context_canceled":
		r.ContextCanceled = 1
	case "context_deadline_exceeded":
		r.ContextDeadlineExceeded = 1
	}

	if len(update.Types) > 1 {
		// fallback metrics
		if update.Types[1] == "fallback-success" {
			r.FallbackSuccesses = 1
		}
		if update.Types[1] == "fallback-failure" {
			r.FallbackFailures = 1
		}
	}

	collector.Update(r)

	wg.Done()
}

Copy the code

Perform an update of the default collector to increase the value of the stored item

func (d *DefaultMetricCollector) Update(r MetricResult) {
	d.mutex.RLock()
	defer d.mutex.RUnlock()

	d.numRequests.Increment(r.Attempts)
	d.errors.Increment(r.Errors)
	d.successes.Increment(r.Successes)
	d.failures.Increment(r.Failures)
	d.rejects.Increment(r.Rejects)
	d.shortCircuits.Increment(r.ShortCircuits)
	d.timeouts.Increment(r.Timeouts)
	d.fallbackSuccesses.Increment(r.FallbackSuccesses)
	d.fallbackFailures.Increment(r.FallbackFailures)
	d.contextCanceled.Increment(r.ContextCanceled)
	d.contextDeadlineExceeded.Increment(r.ContextDeadlineExceeded)

	d.totalDuration.Add(r.TotalDuration)
	d.runDuration.Add(r.RunDuration)
}
Copy the code

Update rollings.Number Increment operation performed when the rollings.Number value is null. If it does not, it creates a new lock. If it already exists, it will add the current time to the parameter and then delete the old data before (10s).

func (r *Number) getCurrentBucket() *numberBucket { now := time.Now().Unix() var bucket *numberBucket var ok bool if bucket, ok = r.Buckets[now]; ! ok { bucket = &numberBucket{} r.Buckets[now] = bucket } return bucket } func (r *Number) Increment(i float64) { if i == 0 { return } r.Mutex.Lock() defer r.Mutex.Unlock() b := r.getCurrentBucket() b.Value += i r.removeOldBuckets() } func (r  *Number) removeOldBuckets() { now := time.Now().Unix() - 10 for timestamp := range r.Buckets { // TODO: configurable rolling window if timestamp <= now { delete(r.Buckets, timestamp) } } }Copy the code

Rolling.Timing is the recording structure used to record the running time of a single execution record and the total time

type Timing struct {
	Buckets map[int64]*timingBucket
	Mutex   *sync.RWMutex

	CachedSortedDurations []time.Duration
	LastCachedTime        int64
}

type timingBucket struct {
	Durations []time.Duration
}
Copy the code

This is very similar to rolling.Number logic, which maintains the duration of each time period periodically (how long has passed from Start to report data). The durations are stored in CachedSortedDurations, an array of sorted, 1-minute records to allow for various statistics to be calculated from the source data.

Finally, how to display these metrics in a more intuitive way, just as Pprof has a web page to view current application performance metrics, Hystrix has a dashboard to view metrics

Just add to our program

hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
Copy the code

This is actually similar to pprof in that a new Goroutine starts a Web service.

This logic is essentially a loop

func (sh *StreamHandler) loop() {
	tick := time.Tick(1 * time.Second)
	for {
		select {
		case <-tick:
			circuitBreakersMutex.RLock()
			for _, cb := range circuitBreakers {
				sh.publishMetrics(cb)
				sh.publishThreadPools(cb.executorPool)
			}
			circuitBreakersMutex.RUnlock()
		case <-sh.done:
			return
		}
	}
}
Copy the code

Data is collected every second and then aggregated

func (sh *StreamHandler) publishMetrics(cb *CircuitBreaker) error { now := time.Now() reqCount := cb.metrics.Requests().Sum(now) errCount := cb.metrics.DefaultCollector().Errors().Sum(now) errPct := cb.metrics.ErrorPercent(now) eventBytes, err := json.Marshal(&streamCmdMetric{ Type: "HystrixCommand", Name: cb.Name, Group: cb.Name, Time: currentTime(), ReportingHosts: 1, RequestCount: uint32(reqCount), ErrorCount: uint32(errCount), ErrorPct: uint32(errPct), CircuitBreakerOpen: cb.IsOpen(), RollingCountSuccess: uint32(cb.metrics.DefaultCollector().Successes().Sum(now)), RollingCountFailure: uint32(cb.metrics.DefaultCollector().Failures().Sum(now)), RollingCountThreadPoolRejected: uint32(cb.metrics.DefaultCollector().Rejects().Sum(now)), RollingCountShortCircuited: uint32(cb.metrics.DefaultCollector().ShortCircuits().Sum(now)), RollingCountTimeout: uint32(cb.metrics.DefaultCollector().Timeouts().Sum(now)), RollingCountFallbackSuccess: uint32(cb.metrics.DefaultCollector().FallbackSuccesses().Sum(now)), RollingCountFallbackFailure: uint32(cb.metrics.DefaultCollector().FallbackFailures().Sum(now)), LatencyTotal: generateLatencyTimings(cb.metrics.DefaultCollector().TotalDuration()), LatencyTotalMean: cb.metrics.DefaultCollector().TotalDuration().Mean(), LatencyExecute: generateLatencyTimings(cb.metrics.DefaultCollector().RunDuration()), LatencyExecuteMean: cb.metrics.DefaultCollector().RunDuration().Mean(), // TODO: all hard-coded values should become configurable settings, per circuit RollingStatsWindow: 10000, ExecutionIsolationStrategy: "THREAD", CircuitBreakerEnabled: true, CircuitBreakerForceClosed: false, CircuitBreakerForceOpen: cb.forceOpen, CircuitBreakerErrorThresholdPercent: uint32(getSettings(cb.Name).ErrorPercentThreshold), CircuitBreakerSleepWindow: uint32(getSettings(cb.Name).SleepWindow.Seconds() * 1000), CircuitBreakerRequestVolumeThreshold: uint32(getSettings(cb.Name).RequestVolumeThreshold), }) if err ! = nil { return err } err = sh.writeToRequests(eventBytes) if err ! = nil { return err } return nil }Copy the code

Write the summarized data to requests, which is a map. The key is http.Request, and the value is our summary data

For more dashboard content, see Hystrix-Dashboard

In terms of flow control, Hystrix uses token-based flow control.

type executorPool struct {
	Name    string
	Metrics *poolMetrics
	Max     int
	Tickets chan *struct{}
}

func newExecutorPool(name string) *executorPool {
	p := &executorPool{}
	p.Name = name
	p.Metrics = newPoolMetrics(name)
	p.Max = getSettings(name).MaxConcurrentRequests

	p.Tickets = make(chan *struct{}, p.Max)
	for i := 0; i < p.Max; i++ {
		p.Tickets <- &struct{}{}
	}

	return p
}

func (p *executorPool) Return(ticket *struct{}) {
	if ticket == nil {
		return
	}

	p.Metrics.Updates <- poolMetricsUpdate{
		activeCount: p.ActiveCount(),
	}
	p.Tickets <- ticket
}

Copy the code

When the number of concurrent requests is less than the maximum, Tickets can be fetched directly from Tickets and returned when they are used up. We also noticed in the GoC that if no more tickets are available, return the ticket and report the ErrMaxConcurrency error.