# Fuse Framework for Microservices architecture: Hystrix-Go

background

With the hype surrounding microservices architecture, several concepts have been pushed to the fore. When it comes to microservices, these words are inseparable: high cohesion and low coupling; The ultimate goal of micro-service architecture design is to achieve these words. In microservice architecture, microservice is to complete a single business function, and each microservice can evolve independently. An application may be composed of multiple microservices, and data exchange between microservices can be completed through remote call. In this way, such dependency relationship will be formed in a microservice architecture:

Microservice A calls microservice C and D, microservice B relies on microservice B and E, and microservice D depends on service F. This is only A simple small example. The dependency relationship between services in actual business is more complex than this. Calls to upstream services (named by invocation relationship) will consume more and more system resources, leading to system crashes. This is the snow jump effect of microservices.

In order to solve the snow jump effect of microservice, a circuit breaker mechanism is proposed to provide protection mechanism for microservice links. Circuit breaker mechanism we should be familiar with, the circuit fuse is a kind of circuit breaker mechanism, what is the circuit breaker mechanism in micro services?

When a microservice in the link is unavailable or the response time is too long, the service will be degraded, and then the call of the microservice of the node will be fused, and the wrong response information will be returned quickly. When the response of the microservice invocation of the node is normal, the call link will be restored.

In this article, we introduce an open source fuse breaker framework: Hystrix-Go.

Fuse frame (Hystrix-GO)

Hystrix is a delay and fault tolerant library designed to isolate points of access to remote systems, services and third party services, stop cascading failures and enable resilience in complex distributed systems where failures are inevitable. Hystrix-go is designed to allow GO programmers to easily build applications with execution semantics similar to those of the Java-based Hystrix library. So this article from use to source analysis of Hystrix-Go.

Fast installation

go get -u github.com/afex/hystrix-go/hystrix
Copy the code

Quick to use

Hystrix-go is really out of the box and relatively easy to use, with two main steps:

  • Configure the circuit breaker rule, otherwise the default configuration is used. Methods that can be called
func Configure(cmds map[string]CommandConfig) 
func ConfigureCommand(name string, config CommandConfig)
Copy the code

Configure: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand

  • Define application logic that depends on external systemsrunFuncAnd the logic code executed during the service interruption –fallbackFunc, methods that can be called:
func Go(name string, run runFunc, fallback fallbackFunc)// Internal callGocmethods
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) 
func Do(name string, run runFunc, fallback fallbackFunc)// The internal call isDocmethods
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)// Internal callGocMethod to handle asynchronous procedures
Copy the code

The difference between Go and Do is whether it is asynchronous or synchronous. The Do method handles the asynchronous process within the Doc method, and they are both ultimately Goc methods called. We will analyze it later.

As an example, we added an interface level fuse middleware to the Gin framework

// The code has been uploaded to Github
var CircuitBreakerName = "api_%s_circuit_breaker"
func CircuitBreakerWrapper(ctx *gin.Context){
	name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL)
	hystrix.Do(name, func(a) error {
		ctx.Next()
		code := ctx.Writer.Status()
		ifcode ! = http.StatusOK{return errors.New(fmt.Sprintf("status code %d", code))
		}
		return nil

	}, func(err error) error {
		iferr ! =nil{
			// Monitoring report (not implemented)
			_, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) // Write file (string)
			fmt.Printf("circuitBreaker and err is %s\n",err.Error())
			// Return fusing error
			ctx.JSON(http.StatusServiceUnavailable,gin.H{
				"msg": err.Error(),
			})
		}
		return nil})}func init(a)  {
	hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{
		Timeout:                int(3*time.Second), // The timeout for executing command is 3s
		MaxConcurrentRequests:  10.// The maximum number of concurrent commands
		RequestVolumeThreshold: 100.// Count the number of requests within 10 seconds of the window, and determine whether to enable the circuit breaker when the number of requests reaches this threshold
		SleepWindow:            int(2 * time.Second), // When the fuse is turned on, SleepWindow controls how long it takes to try to see if the service is available
		ErrorPercentThreshold:  20.// Error percentage. If the number of requests is greater than or equal to RequestVolumeThreshold and the error rate reaches this threshold, the fuse will be triggered
	})
	if checkFileIsExist(filename) { // If the file exists
		f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) // Open the file
	} else {
		f, errfile = os.Create(filename) // Create a file}}func main(a)  {
	defer f.Close()
	hystrixStreamHandler := hystrix.NewStreamHandler()
	hystrixStreamHandler.Start()
	go http.ListenAndServe(net.JoinHostPort(""."81"), hystrixStreamHandler)
	r := gin.Default()
	r.GET("/api/ping/baidu".func(c *gin.Context) {
		_, err := http.Get("https://www.baidu.com")
		iferr ! =nil {
			c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()})
			return
		}
		c.JSON(http.StatusOK, gin.H{"msg": "success"})
	}, CircuitBreakerWrapper)
	r.Run()  // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}

func checkFileIsExist(filename string) bool {
	if _, err := os.Stat(filename); os.IsNotExist(err) {
		return false
	}
	return true
}
Copy the code

Instructions: WRK – t100 – c100 – d1s http://127.0.0.1:8080/api/ping/baidu

Running results:

circuitBreaker and err is status code 500
circuitBreaker and err is status code 500. circuitBreaker and err is hystrix: max concurrency circuitBreaker and err is hystrix: max concurrency ..... circuitBreaker and err is hystrix: circuit open circuitBreaker and err is hystrix: circuit open .....Copy the code

Analysis of errors:

  • circuitBreaker and err is status code 500: The request is not answered because we are down the network
  • circuitBreaker and err is hystrix: max concurrency: The maximum concurrency we setMaxConcurrentRequestsis10, our pressure gauge tool uses 100 concurrent, so this fuse will be triggered
  • circuitBreaker and err is hystrix: circuit open: We set the number of requests for fusing onRequestVolumeThresholdis100, so when10The number of requests in s is greater than100“Triggers a circuit breaker.

Here’s a quick explanation of the above example:

  • Add interface level fuse middleware
  • Initialize fuse – breaker configurations
  • opendashboardVisualize the hystrix report information, open the browserhttp://localhost:81, you can see the following result:

hystrix-goProcess analysis

Originally wanted to analyze the source code, the amount of code is a bit large, so for the process to analyze, by the way look at some core code.

Configure fuse breaker rules

We can call two methods to configure the circuit breaker rule. We will not end up calling both of them ConfigureCommand. There is no special logic here.

var (
	// DefaultTimeout is how long to wait for command to complete, in milliseconds
	DefaultTimeout = 1000
	// DefaultMaxConcurrent is how many commands of the same type can run at the same time
	DefaultMaxConcurrent = 10
	// DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
	DefaultVolumeThreshold = 20
	// DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
	DefaultSleepWindow = 5000
	// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
	DefaultErrorPercentThreshold = 50
	// DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.
	DefaultLogger = NoopLogger{}
)
Copy the code

The configuration rules are as follows:

  • Timeout: Defines the timeout period for executing the command, in unitsms, the default time is1000ms;
  • MaxConcurrnetRequests: defines the maximum number of concurrent commands. The default value is10Concurrency value;
  • SleepWindow: After the fuse is turned on, after the fuse is turned on, according toSleepWindowThe time set controls how long it takes to try whether the service is available. The default time is5000ms;
  • RequestVolumeThreshold: One of the conditions for judging fuse switches, statistics10s(written dead in the code) the number of requests, after reaching this number of requests, according to the error rate to determine whether to start the fuse;
  • ErrorPercentThreshold: One of the conditions for judging fuse switches, the percentage of errors counted, the number of requests greater than or equal toRequestVolumeThresholdAnd when the error rate reaches this percentage, it startsfusing The default value is 50;

These rules are stored in a map based on the command name.

Execute the command

There are four methods that can be invoked by executing command:

func Go(name string, run runFunc, fallback fallbackFunc)
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) 
func Do(name string, run runFunc, fallback fallbackFunc)
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
Copy the code

The Doc method is called from Do, the Goc method is called from Go, and the Goc method is called from Doc, but the synchronization logic is done within Doc:

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error{... Part of the wrapper code is omittedvar errChan chan error
	if fallback == nil {
		errChan = GoC(ctx, name, r, nil)}else {
		errChan = GoC(ctx, name, r, f)
	}

	select {
	case <-done:
		return nil
	case err := <-errChan:
		return err
	}
}
Copy the code

Because they are ultimately called Goc methods, we perform analysis of the internal logic of the Goc methods; The code is a bit long, so let’s break it down into logic:

createcommandobject
	cmd := &command{
		run:      run,
		fallback: fallback,
		start:    time.Now(),
		errChan:  make(chan error, 1),
		finished: make(chan bool.1),}// Get the fuse
	circuit, _, err := GetCircuit(name)
	iferr ! =nil {
		cmd.errChan <- err
		return cmd.errChan
	}
Copy the code

Introduce the command data structure:

type command struct {
	sync.Mutex

	ticket      *struct{}
	start       time.Time
	errChan     chan error
	finished    chan bool
	circuit     *CircuitBreaker
	run         runFuncC
	fallback    fallbackFuncC
	runDuration time.Duration
	events      []string
}
Copy the code

Field Description:

  • ticket: for maximum concurrency control, this is a token
  • startRecord:commandStart time of execution
  • errChanRecord:commandPerform error
  • finishedSign:commandThe end of execution is used to synchronize the coroutine
  • circuit: Stores fuses
  • run: Application
  • fallback: A function to execute if the application fails
  • runDurationRecord:commandExecution time
  • events:eventsIt stores event type information, such as successful executionsuccessOr failedtimeout,context_canceledEtc.

The last code focuses on GetCircuit method, the purpose of this step is to obtain the fuse, using dynamic loading mode, if not create a fuse, fuse structure is as follows:

type CircuitBreaker struct {
	Name                   string
	open                   bool
	forceOpen              bool
	mutex                  *sync.RWMutex
	openedOrLastTestedTime int64

	executorPool *executorPool
	metrics      *metricExchange
}
Copy the code

Explain these fields:

  • name: The name of the fuse, which is actually the name of the command created
  • open: Indicates whether the fuse is open
  • forceopen: Manually trigger fuse switch for unit test
  • mutex: Use read/write locks to ensure concurrent security
  • openedOrLastTestedTime: Record the time when the fuse was turned on last time, because according to this time andSleepWindowTime to make a recovery attempt
  • executorPool: for traffic control, because we have a maximum concurrency control, traffic control based on this, every request to get a token
  • metrics: An event used to report the execution status, through which the execution status information is stored in the actual fuse execution of each dimension status (times of success, times of failure, timeout…) In the data set of.

The implementation logic of executorPool and metrics will be analyzed separately.

Define token-related methods and variables

Since we have a condition for maximum concurrency control, we use token for traffic control. Each request requires a token, and the token is returned after use. Let’s look at this code:

	ticketCond := sync.NewCond(cmd)
	ticketChecked := false
	// When the caller extracts error from returned errChan, it's assumed that
	// the ticket's been returned to executorPool. Therefore, returnTicket() can
	// not run after cmd.errorWithFallback().
	returnTicket := func(a) {
		cmd.Lock()
		// Avoid releasing before a ticket is acquired.
		for! ticketChecked { ticketCond.Wait() } cmd.circuit.executorPool.Return(cmd.ticket) cmd.Unlock() }Copy the code

Use sync.NewCond to create a condition variable that coordinates notification that you can return the token.

Then define a method that returns the token and call the Return method to Return the token.

Defines methods for reporting execution events

As mentioned earlier, our fuse will report the event of execution status, through which the execution status information will be stored in the actual fuse execution of each dimension status (times of success, times of failure, timeout…). In the data set of. So define a reporting method:

	reportAllEvent := func(a) {
		err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
		iferr ! =nil {
			log.Printf(err.Error())
		}
	}
Copy the code
Start coroutine one: Execute application logic –runFunc

The main purpose of coroutine one is to execute application logic:

go func(a) {
		defer func(a) { cmd.finished <- true} ()// signal the end of command execution of coroutine 1 and synchronize to coroutine 2

		// Fuses are turned on when the number of recent concurrent executions exceeds the threshold and the error rate is high.
  	// If the fuse is on, simply reject the rejection request and return the token. When the health status is felt, the fuse will allow new traffic.
		if! cmd.circuit.AllowRequest() { cmd.Lock()// It's safe for another goroutine to go ahead releasing a nil ticket.
			ticketChecked = true
			ticketCond.Signal() // Indicates that the ticket signal is released
			cmd.Unlock()
      // Use sync.onece to ensure only one execution.
			returnOnce.Do(func(a) {
        // Return the token
				returnTicket()
        // Execute fallback logic
				cmd.errorWithFallback(ctx, ErrCircuitOpen)
        // Report a status event
				reportAllEvent()
			})
			return
		}
   // Control concurrency
		cmd.Lock()
		select {
    // Get the token
		case cmd.ticket = <-circuit.executorPool.Tickets:
      // Send the release token signal
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
		default:
     	// There are no more tokens available, i.e. the fallback logic is handled directly when the maximum number of concurrent requests is reached
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
			returnOnce.Do(func(a) {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrMaxConcurrency)
				reportAllEvent()
			})
			return
		}
		// Execute application logic
		runStart := time.Now()
		runErr := run(ctx)
		returnOnce.Do(func(a) {
			defer reportAllEvent() // Status events are reported
      // Count the application execution time
			cmd.runDuration = time.Since(runStart)
      // Return the token
			returnTicket()
      // Execute fallback if the application fails
			ifrunErr ! =nil {
				cmd.errorWithFallback(ctx, runErr)
				return
			}
			cmd.reportEvent("success")})} ()Copy the code

To summarize the coroutine:

  • Judge whether the fuse is turned on. If the fuse is turned on, fuse directly without subsequent requests
  • Run the application logic
Open coroutine 2: synchronizes coroutines to listen for errors

First look at the code:

go func(a) {
    // Use a timer to control the timeout. The timeout time is specified by us, and the default is 1000ms
		timer := time.NewTimer(getSettings(name).Timeout)
		defer timer.Stop()

		select {
      // synchronous coroutine 1
		case <-cmd.finished:
			// returnOnce has been executed in another goroutine
      
    // Whether the context cancellation signal is received
		case <-ctx.Done():
			returnOnce.Do(func(a) {
				returnTicket()
				cmd.errorWithFallback(ctx, ctx.Err())
				reportAllEvent()
			})
			return
    // Command execution timed out
		case <-timer.C:
			returnOnce.Do(func(a) {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrTimeout)
				reportAllEvent()
			})
			return
		}
	}()
Copy the code

The logic of this coroutine is fairly clear, and the purpose is to listen for cancellations and timeouts.

Draw a diagram to summarize the command execution process

Above we are all through the code to analyze, it still looks a bit messy, finally draw a picture to summarize:

We analyzed the whole process above, and then we analyzed some core points

Reporting status Events

Hystrix-go sets a default statistics controller for each Command to store all fuses, including the number of calls, failures, and rejects. The storage indicator structure is as follows:

type DefaultMetricCollector struct {
	mutex *sync.RWMutex

	numRequests *rolling.Number
	errors      *rolling.Number

	successes               *rolling.Number
	failures                *rolling.Number
	rejects                 *rolling.Number
	shortCircuits           *rolling.Number
	timeouts                *rolling.Number
	contextCanceled         *rolling.Number
	contextDeadlineExceeded *rolling.Number

	fallbackSuccesses *rolling.Number
	fallbackFailures  *rolling.Number
	totalDuration     *rolling.Timing
	runDuration       *rolling.Timing
}
Copy the code

Rolling.Number structure is used to save the status indicator, and Rolling.Timing is used to save the time indicator.

The final monitoring report relies on metricExchange, and the data structure is as follows:

type metricExchange struct {
	Name    string
	Updates chan *commandExecution
	Mutex   *sync.RWMutex

	metricCollectors []metricCollector.MetricCollector
}
Copy the code

The command structure is as follows:

type commandExecution struct {
	Types            []string      `json:"types"` // Distinguish between event types, such as success, failure....
	Start            time.Time     `json:"start_time"` // command Start time
	RunDuration      time.Duration `json:"run_duration"` // command Indicates the end time
	ConcurrencyInUse float64       `json:"concurrency_inuse"` // Command thread pool usage
}
Copy the code

After all this, you’re still a little confused, but you can use a class diagram to show the relationship between them:

We can see that the mertricExchange class provides a Monitor method whose main logic is to listen for status events and then write metrics, so the whole reporting process looks like this:

Flow control

Hystrix-go uses token algorithm for flow control. Those who can get tokens can perform subsequent work and return tokens after execution. ExecutorPool is an implementation of Hystrix-Go flow control. The field Max is the maximum concurrency value per second.

type executorPool struct {
	Name    string
	Metrics *poolMetrics // Report the execution quantity indicator
	Max     int // Maximum number of concurrent requests
	Tickets chan *struct{} // represents the token
}
Copy the code

There is also a reporting indicator, which separately implements a set of methods to count the number of executions, such as the total number of executions, the maximum number of concurrent executions, etc. We rely on drawing a class diagram to represent:

The logic of reporting execution quantity is the same as that of reporting status events. For data communication using channel, both reporting and returning tokens are in Return methods:

func (p *executorPool) Return(ticket *struct{}) {
	if ticket == nil {
		return
	}

	p.Metrics.Updates <- poolMetricsUpdate{
		activeCount: p.ActiveCount(),
	}
	p.Tickets <- ticket
}
Copy the code

There are two main logical steps:

  • Reports the number of tokens currently available
  • Returns the token

fuse

Finally, we will analyze an important method in fuses: AllowRequest. We will judge whether Command can be executed according to this method when executing Command. Next, we will look at the main logic of this judgment:

func (circuit *CircuitBreaker) AllowRequest(a) bool {
	return! circuit.IsOpen() || circuit.allowSingleTest() }Copy the code

Internally, IsOpen() and allowSingleTest are called:

  • IsOpen()
func (circuit *CircuitBreaker) IsOpen(a) bool {
	circuit.mutex.RLock()
	o := circuit.forceOpen || circuit.open
	circuit.mutex.RUnlock()
	// The fuse is on
	if o {
		return true
	}
	Check whether the number of concurrent requests within 10 seconds exceeds the maximum value. If the number does not exceed the maximum value, you do not need to enable the fuse
	if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
		return false
	}
	// The number of concurrent requests within 10 seconds exceeds the maximum value. If the system error rate exceeds the preset value, turn on the fuse
	if! circuit.metrics.IsHealthy(time.Now()) {// 
		circuit.setOpen()
		return true
	}

	return false
}
Copy the code
  • allowSingleTest()

To explain why we have this method, remember that we set the “SleepWindow” in the fuse rule earlier. This method is designed to do just that if you try it after the “SleepWindow” time with the fuse enabled:

func (circuit *CircuitBreaker) allowSingleTest(a) bool {
	circuit.mutex.RLock()
	defer circuit.mutex.RUnlock()
	
  // Get the current timestamp
	now := time.Now().UnixNano()
	openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
  // The current fuse is enabled, and the current time is greater than (the last time the fuse was enabled + the time at SleepWindow)
	if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
    / / replace openedOrLastTestedTime
		swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
		if swapped {
			log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
		}
		return swapped
	}
Copy the code

The logic of closing the fuse is implemented in the method of reporting the status indicator in ReportEvent. Finally, let’s look at the implementation of ReportEvent:

func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
	if len(eventTypes) == 0 {
		return fmt.Errorf("no event types sent for metrics")
	}
	
	circuit.mutex.RLock()
	o := circuit.open
	circuit.mutex.RUnlock()
  // If the reported status event is SUCCESS and the current fuse is on, the downstream service is normal and the fuse can be shut down
	if eventTypes[0] = ="success" && o {
		circuit.setClose()
	}

	var concurrencyInUse float64
	if circuit.executorPool.Max > 0 {
		concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
	}

	select {
    // Report the status indicator, echoing monitor above
	case circuit.metrics.Updates <- &commandExecution{
		Types:            eventTypes,
		Start:            start,
		RunDuration:      runDuration,
		ConcurrencyInUse: concurrencyInUse,
	}:
	default:
		return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
	}

	return nil
}
Copy the code

Visualize hystrix reporting information

According to the above analysis, we know that Hystrix-Go reports status events and execution number events, so how can we check these indicators?

The designers had this problem in mind, so they built a Dashborad that allows you to view Hystrix’s reports by adding the following code when the service starts:

hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort(""."81"), hystrixStreamHandler)
Copy the code

Then open the browser: http://127.0.0.1:81/hystrix-dashboard, were observed.

conclusion

Story finally draws to a close, an implementation of circuit breakers is not simple, want to consider the factors is also every aspect, especially in the service architecture, the circuit breakers are essential, not only to realize circuit breakers at the framework level, but also according to the specific business scenarios using circuit breakers, these are all worthy of our thought. The fuse breaker framework introduced in this paper is quite perfect. This excellent design idea is worth learning.

The code has been uploaded to github: github.com/asong2020/G…

Welcome to our official account: [Golang Dream Factory]

Recommended previous articles:

  • Learning channel design: From getting started to giving up
  • Detail memory alignment
  • Do not abuse Goroutine
  • Source analysis panic and recover, do not understand you hit me!
  • Interviewer: Matsuko is here to talk about memory escape
  • Interviewer: Can you talk about the conversion between string and []byte?
  • Interviewer: What is the result of two nil comparisons?
  • Errgroup for concurrent programming packages