# Fuse Framework for Microservices architecture: Hystrix-Go
background
With the hype surrounding microservices architecture, several concepts have been pushed to the fore. When it comes to microservices, these words are inseparable: high cohesion and low coupling; The ultimate goal of micro-service architecture design is to achieve these words. In microservice architecture, microservice is to complete a single business function, and each microservice can evolve independently. An application may be composed of multiple microservices, and data exchange between microservices can be completed through remote call. In this way, such dependency relationship will be formed in a microservice architecture:
Microservice A calls microservice C and D, microservice B relies on microservice B and E, and microservice D depends on service F. This is only A simple small example. The dependency relationship between services in actual business is more complex than this. Calls to upstream services (named by invocation relationship) will consume more and more system resources, leading to system crashes. This is the snow jump effect of microservices.
In order to solve the snow jump effect of microservice, a circuit breaker mechanism is proposed to provide protection mechanism for microservice links. Circuit breaker mechanism we should be familiar with, the circuit fuse is a kind of circuit breaker mechanism, what is the circuit breaker mechanism in micro services?
When a microservice in the link is unavailable or the response time is too long, the service will be degraded, and then the call of the microservice of the node will be fused, and the wrong response information will be returned quickly. When the response of the microservice invocation of the node is normal, the call link will be restored.
In this article, we introduce an open source fuse breaker framework: Hystrix-Go.
Fuse frame (Hystrix-GO)
Hystrix is a delay and fault tolerant library designed to isolate points of access to remote systems, services and third party services, stop cascading failures and enable resilience in complex distributed systems where failures are inevitable. Hystrix-go is designed to allow GO programmers to easily build applications with execution semantics similar to those of the Java-based Hystrix library. So this article from use to source analysis of Hystrix-Go.
Fast installation
go get -u github.com/afex/hystrix-go/hystrix
Copy the code
Quick to use
Hystrix-go is really out of the box and relatively easy to use, with two main steps:
- Configure the circuit breaker rule, otherwise the default configuration is used. Methods that can be called
func Configure(cmds map[string]CommandConfig)
func ConfigureCommand(name string, config CommandConfig)
Copy the code
Configure: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand: ConfigureCommand
- Define application logic that depends on external systems
runFunc
And the logic code executed during the service interruption –fallbackFunc
, methods that can be called:
func Go(name string, run runFunc, fallback fallbackFunc)// Internal callGocmethods
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
func Do(name string, run runFunc, fallback fallbackFunc)// The internal call isDocmethods
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)// Internal callGocMethod to handle asynchronous procedures
Copy the code
The difference between Go and Do is whether it is asynchronous or synchronous. The Do method handles the asynchronous process within the Doc method, and they are both ultimately Goc methods called. We will analyze it later.
As an example, we added an interface level fuse middleware to the Gin framework
// The code has been uploaded to Github
var CircuitBreakerName = "api_%s_circuit_breaker"
func CircuitBreakerWrapper(ctx *gin.Context){
name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL)
hystrix.Do(name, func(a) error {
ctx.Next()
code := ctx.Writer.Status()
ifcode ! = http.StatusOK{return errors.New(fmt.Sprintf("status code %d", code))
}
return nil
}, func(err error) error {
iferr ! =nil{
// Monitoring report (not implemented)
_, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) // Write file (string)
fmt.Printf("circuitBreaker and err is %s\n",err.Error())
// Return fusing error
ctx.JSON(http.StatusServiceUnavailable,gin.H{
"msg": err.Error(),
})
}
return nil})}func init(a) {
hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{
Timeout: int(3*time.Second), // The timeout for executing command is 3s
MaxConcurrentRequests: 10.// The maximum number of concurrent commands
RequestVolumeThreshold: 100.// Count the number of requests within 10 seconds of the window, and determine whether to enable the circuit breaker when the number of requests reaches this threshold
SleepWindow: int(2 * time.Second), // When the fuse is turned on, SleepWindow controls how long it takes to try to see if the service is available
ErrorPercentThreshold: 20.// Error percentage. If the number of requests is greater than or equal to RequestVolumeThreshold and the error rate reaches this threshold, the fuse will be triggered
})
if checkFileIsExist(filename) { // If the file exists
f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) // Open the file
} else {
f, errfile = os.Create(filename) // Create a file}}func main(a) {
defer f.Close()
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort(""."81"), hystrixStreamHandler)
r := gin.Default()
r.GET("/api/ping/baidu".func(c *gin.Context) {
_, err := http.Get("https://www.baidu.com")
iferr ! =nil {
c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"msg": "success"})
}, CircuitBreakerWrapper)
r.Run() // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080")
}
func checkFileIsExist(filename string) bool {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false
}
return true
}
Copy the code
Instructions: WRK – t100 – c100 – d1s http://127.0.0.1:8080/api/ping/baidu
Running results:
circuitBreaker and err is status code 500
circuitBreaker and err is status code 500. circuitBreaker and err is hystrix: max concurrency circuitBreaker and err is hystrix: max concurrency ..... circuitBreaker and err is hystrix: circuit open circuitBreaker and err is hystrix: circuit open .....Copy the code
Analysis of errors:
circuitBreaker and err is status code 500
: The request is not answered because we are down the networkcircuitBreaker and err is hystrix: max concurrency
: The maximum concurrency we setMaxConcurrentRequests
is10
, our pressure gauge tool uses 100 concurrent, so this fuse will be triggeredcircuitBreaker and err is hystrix: circuit open
: We set the number of requests for fusing onRequestVolumeThreshold
is100
, so when10
The number of requests in s is greater than100
“Triggers a circuit breaker.
Here’s a quick explanation of the above example:
- Add interface level fuse middleware
- Initialize fuse – breaker configurations
- open
dashboard
Visualize the hystrix report information, open the browserhttp://localhost:81
, you can see the following result:
hystrix-go
Process analysis
Originally wanted to analyze the source code, the amount of code is a bit large, so for the process to analyze, by the way look at some core code.
Configure fuse breaker rules
We can call two methods to configure the circuit breaker rule. We will not end up calling both of them ConfigureCommand. There is no special logic here.
var (
// DefaultTimeout is how long to wait for command to complete, in milliseconds
DefaultTimeout = 1000
// DefaultMaxConcurrent is how many commands of the same type can run at the same time
DefaultMaxConcurrent = 10
// DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health
DefaultVolumeThreshold = 20
// DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery
DefaultSleepWindow = 5000
// DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests
DefaultErrorPercentThreshold = 50
// DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing.
DefaultLogger = NoopLogger{}
)
Copy the code
The configuration rules are as follows:
Timeout
: Defines the timeout period for executing the command, in unitsms
, the default time is1000ms
;MaxConcurrnetRequests
: defines the maximum number of concurrent commands. The default value is10
Concurrency value;SleepWindow
: After the fuse is turned on, after the fuse is turned on, according toSleepWindow
The time set controls how long it takes to try whether the service is available. The default time is5000ms
;RequestVolumeThreshold
: One of the conditions for judging fuse switches, statistics10s
(written dead in the code) the number of requests, after reaching this number of requests, according to the error rate to determine whether to start the fuse;ErrorPercentThreshold
: One of the conditions for judging fuse switches, the percentage of errors counted, the number of requests greater than or equal toRequestVolumeThreshold
And when the error rate reaches this percentage, it startsfusing
The default value is 50
;
These rules are stored in a map based on the command name.
Execute the command
There are four methods that can be invoked by executing command:
func Go(name string, run runFunc, fallback fallbackFunc)
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
func Do(name string, run runFunc, fallback fallbackFunc)
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)
Copy the code
The Doc method is called from Do, the Goc method is called from Go, and the Goc method is called from Doc, but the synchronization logic is done within Doc:
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error{... Part of the wrapper code is omittedvar errChan chan error
if fallback == nil {
errChan = GoC(ctx, name, r, nil)}else {
errChan = GoC(ctx, name, r, f)
}
select {
case <-done:
return nil
case err := <-errChan:
return err
}
}
Copy the code
Because they are ultimately called Goc methods, we perform analysis of the internal logic of the Goc methods; The code is a bit long, so let’s break it down into logic:
createcommand
object
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool.1),}// Get the fuse
circuit, _, err := GetCircuit(name)
iferr ! =nil {
cmd.errChan <- err
return cmd.errChan
}
Copy the code
Introduce the command data structure:
type command struct {
sync.Mutex
ticket *struct{}
start time.Time
errChan chan error
finished chan bool
circuit *CircuitBreaker
run runFuncC
fallback fallbackFuncC
runDuration time.Duration
events []string
}
Copy the code
Field Description:
ticket
: for maximum concurrency control, this is a tokenstart
Record:command
Start time of executionerrChan
Record:command
Perform errorfinished
Sign:command
The end of execution is used to synchronize the coroutinecircuit
: Stores fusesrun
: Applicationfallback
: A function to execute if the application failsrunDuration
Record:command
Execution timeevents
:events
It stores event type information, such as successful executionsuccess
Or failedtimeout
,context_canceled
Etc.
The last code focuses on GetCircuit method, the purpose of this step is to obtain the fuse, using dynamic loading mode, if not create a fuse, fuse structure is as follows:
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64
executorPool *executorPool
metrics *metricExchange
}
Copy the code
Explain these fields:
name
: The name of the fuse, which is actually the name of the command createdopen
: Indicates whether the fuse is openforceopen
: Manually trigger fuse switch for unit testmutex
: Use read/write locks to ensure concurrent securityopenedOrLastTestedTime
: Record the time when the fuse was turned on last time, because according to this time andSleepWindow
Time to make a recovery attemptexecutorPool
: for traffic control, because we have a maximum concurrency control, traffic control based on this, every request to get a tokenmetrics
: An event used to report the execution status, through which the execution status information is stored in the actual fuse execution of each dimension status (times of success, times of failure, timeout…) In the data set of.
The implementation logic of executorPool and metrics will be analyzed separately.
Define token-related methods and variables
Since we have a condition for maximum concurrency control, we use token for traffic control. Each request requires a token, and the token is returned after use. Let’s look at this code:
ticketCond := sync.NewCond(cmd)
ticketChecked := false
// When the caller extracts error from returned errChan, it's assumed that
// the ticket's been returned to executorPool. Therefore, returnTicket() can
// not run after cmd.errorWithFallback().
returnTicket := func(a) {
cmd.Lock()
// Avoid releasing before a ticket is acquired.
for! ticketChecked { ticketCond.Wait() } cmd.circuit.executorPool.Return(cmd.ticket) cmd.Unlock() }Copy the code
Use sync.NewCond to create a condition variable that coordinates notification that you can return the token.
Then define a method that returns the token and call the Return method to Return the token.
Defines methods for reporting execution events
As mentioned earlier, our fuse will report the event of execution status, through which the execution status information will be stored in the actual fuse execution of each dimension status (times of success, times of failure, timeout…). In the data set of. So define a reporting method:
reportAllEvent := func(a) {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
iferr ! =nil {
log.Printf(err.Error())
}
}
Copy the code
Start coroutine one: Execute application logic –runFunc
The main purpose of coroutine one is to execute application logic:
go func(a) {
defer func(a) { cmd.finished <- true} ()// signal the end of command execution of coroutine 1 and synchronize to coroutine 2
// Fuses are turned on when the number of recent concurrent executions exceeds the threshold and the error rate is high.
// If the fuse is on, simply reject the rejection request and return the token. When the health status is felt, the fuse will allow new traffic.
if! cmd.circuit.AllowRequest() { cmd.Lock()// It's safe for another goroutine to go ahead releasing a nil ticket.
ticketChecked = true
ticketCond.Signal() // Indicates that the ticket signal is released
cmd.Unlock()
// Use sync.onece to ensure only one execution.
returnOnce.Do(func(a) {
// Return the token
returnTicket()
// Execute fallback logic
cmd.errorWithFallback(ctx, ErrCircuitOpen)
// Report a status event
reportAllEvent()
})
return
}
// Control concurrency
cmd.Lock()
select {
// Get the token
case cmd.ticket = <-circuit.executorPool.Tickets:
// Send the release token signal
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
// There are no more tokens available, i.e. the fallback logic is handled directly when the maximum number of concurrent requests is reached
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func(a) {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}
// Execute application logic
runStart := time.Now()
runErr := run(ctx)
returnOnce.Do(func(a) {
defer reportAllEvent() // Status events are reported
// Count the application execution time
cmd.runDuration = time.Since(runStart)
// Return the token
returnTicket()
// Execute fallback if the application fails
ifrunErr ! =nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")})} ()Copy the code
To summarize the coroutine:
- Judge whether the fuse is turned on. If the fuse is turned on, fuse directly without subsequent requests
- Run the application logic
Open coroutine 2: synchronizes coroutines to listen for errors
First look at the code:
go func(a) {
// Use a timer to control the timeout. The timeout time is specified by us, and the default is 1000ms
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
// synchronous coroutine 1
case <-cmd.finished:
// returnOnce has been executed in another goroutine
// Whether the context cancellation signal is received
case <-ctx.Done():
returnOnce.Do(func(a) {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
// Command execution timed out
case <-timer.C:
returnOnce.Do(func(a) {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()
Copy the code
The logic of this coroutine is fairly clear, and the purpose is to listen for cancellations and timeouts.
Draw a diagram to summarize the command execution process
Above we are all through the code to analyze, it still looks a bit messy, finally draw a picture to summarize:
We analyzed the whole process above, and then we analyzed some core points
Reporting status Events
Hystrix-go sets a default statistics controller for each Command to store all fuses, including the number of calls, failures, and rejects. The storage indicator structure is as follows:
type DefaultMetricCollector struct {
mutex *sync.RWMutex
numRequests *rolling.Number
errors *rolling.Number
successes *rolling.Number
failures *rolling.Number
rejects *rolling.Number
shortCircuits *rolling.Number
timeouts *rolling.Number
contextCanceled *rolling.Number
contextDeadlineExceeded *rolling.Number
fallbackSuccesses *rolling.Number
fallbackFailures *rolling.Number
totalDuration *rolling.Timing
runDuration *rolling.Timing
}
Copy the code
Rolling.Number structure is used to save the status indicator, and Rolling.Timing is used to save the time indicator.
The final monitoring report relies on metricExchange, and the data structure is as follows:
type metricExchange struct {
Name string
Updates chan *commandExecution
Mutex *sync.RWMutex
metricCollectors []metricCollector.MetricCollector
}
Copy the code
The command structure is as follows:
type commandExecution struct {
Types []string `json:"types"` // Distinguish between event types, such as success, failure....
Start time.Time `json:"start_time"` // command Start time
RunDuration time.Duration `json:"run_duration"` // command Indicates the end time
ConcurrencyInUse float64 `json:"concurrency_inuse"` // Command thread pool usage
}
Copy the code
After all this, you’re still a little confused, but you can use a class diagram to show the relationship between them:
We can see that the mertricExchange class provides a Monitor method whose main logic is to listen for status events and then write metrics, so the whole reporting process looks like this:
Flow control
Hystrix-go uses token algorithm for flow control. Those who can get tokens can perform subsequent work and return tokens after execution. ExecutorPool is an implementation of Hystrix-Go flow control. The field Max is the maximum concurrency value per second.
type executorPool struct {
Name string
Metrics *poolMetrics // Report the execution quantity indicator
Max int // Maximum number of concurrent requests
Tickets chan *struct{} // represents the token
}
Copy the code
There is also a reporting indicator, which separately implements a set of methods to count the number of executions, such as the total number of executions, the maximum number of concurrent executions, etc. We rely on drawing a class diagram to represent:
The logic of reporting execution quantity is the same as that of reporting status events. For data communication using channel, both reporting and returning tokens are in Return methods:
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket
}
Copy the code
There are two main logical steps:
- Reports the number of tokens currently available
- Returns the token
fuse
Finally, we will analyze an important method in fuses: AllowRequest. We will judge whether Command can be executed according to this method when executing Command. Next, we will look at the main logic of this judgment:
func (circuit *CircuitBreaker) AllowRequest(a) bool {
return! circuit.IsOpen() || circuit.allowSingleTest() }Copy the code
Internally, IsOpen() and allowSingleTest are called:
IsOpen()
func (circuit *CircuitBreaker) IsOpen(a) bool {
circuit.mutex.RLock()
o := circuit.forceOpen || circuit.open
circuit.mutex.RUnlock()
// The fuse is on
if o {
return true
}
Check whether the number of concurrent requests within 10 seconds exceeds the maximum value. If the number does not exceed the maximum value, you do not need to enable the fuse
if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
return false
}
// The number of concurrent requests within 10 seconds exceeds the maximum value. If the system error rate exceeds the preset value, turn on the fuse
if! circuit.metrics.IsHealthy(time.Now()) {//
circuit.setOpen()
return true
}
return false
}
Copy the code
allowSingleTest()
To explain why we have this method, remember that we set the “SleepWindow” in the fuse rule earlier. This method is designed to do just that if you try it after the “SleepWindow” time with the fuse enabled:
func (circuit *CircuitBreaker) allowSingleTest(a) bool {
circuit.mutex.RLock()
defer circuit.mutex.RUnlock()
// Get the current timestamp
now := time.Now().UnixNano()
openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
// The current fuse is enabled, and the current time is greater than (the last time the fuse was enabled + the time at SleepWindow)
if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
/ / replace openedOrLastTestedTime
swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now)
if swapped {
log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name)
}
return swapped
}
Copy the code
The logic of closing the fuse is implemented in the method of reporting the status indicator in ReportEvent. Finally, let’s look at the implementation of ReportEvent:
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
if len(eventTypes) == 0 {
return fmt.Errorf("no event types sent for metrics")
}
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
// If the reported status event is SUCCESS and the current fuse is on, the downstream service is normal and the fuse can be shut down
if eventTypes[0] = ="success" && o {
circuit.setClose()
}
var concurrencyInUse float64
if circuit.executorPool.Max > 0 {
concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
}
select {
// Report the status indicator, echoing monitor above
case circuit.metrics.Updates <- &commandExecution{
Types: eventTypes,
Start: start,
RunDuration: runDuration,
ConcurrencyInUse: concurrencyInUse,
}:
default:
return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
}
return nil
}
Copy the code
Visualize hystrix reporting information
According to the above analysis, we know that Hystrix-Go reports status events and execution number events, so how can we check these indicators?
The designers had this problem in mind, so they built a Dashborad that allows you to view Hystrix’s reports by adding the following code when the service starts:
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort(""."81"), hystrixStreamHandler)
Copy the code
Then open the browser: http://127.0.0.1:81/hystrix-dashboard, were observed.
conclusion
Story finally draws to a close, an implementation of circuit breakers is not simple, want to consider the factors is also every aspect, especially in the service architecture, the circuit breakers are essential, not only to realize circuit breakers at the framework level, but also according to the specific business scenarios using circuit breakers, these are all worthy of our thought. The fuse breaker framework introduced in this paper is quite perfect. This excellent design idea is worth learning.
The code has been uploaded to github: github.com/asong2020/G…
Welcome to our official account: [Golang Dream Factory]
Recommended previous articles:
- Learning channel design: From getting started to giving up
- Detail memory alignment
- Do not abuse Goroutine
- Source analysis panic and recover, do not understand you hit me!
- Interviewer: Matsuko is here to talk about memory escape
- Interviewer: Can you talk about the conversion between string and []byte?
- Interviewer: What is the result of two nil comparisons?
- Errgroup for concurrent programming packages