Why do we need fuses
In a microservice cluster, each application basically depends on a certain number of external services. It is possible to encounter slow network connection, timeout, dependent service overload and service unavailability at any time. In high concurrency scenarios, if the caller does not do anything at this time and continues to request the faulty service, it is easy to cause an avalanche of the entire microservice cluster. For example, in high-concurrency scenarios, users order services, which generally rely on the following services:
- Goods and services
- Account service
- The inventory service
If the account service is overloaded at this time, the account service can only passively wait for the account service to report an error or request timeout, resulting in a large number of order requests, these invalid requests will still occupy system resources: CPU, memory, data connection… The order service is unavailable. Even if the account service is restored the order service cannot recover itself.
At this time, if there is an active protection mechanism to deal with such a scenario, the order service can at least guarantee its own running status, and the order service can also synchronize self-recovery when the account service is restored. This self-protection mechanism is called circuit breaker mechanism in service governance.
fusing
A circuit breaker is a mechanism by which the caller protects itself (and objectively protects the called party), and the object of the circuit breaker is an external service.
demotion
Degradation is the called party’s (service provider’s) self-protection mechanism against overload due to its own lack of resources. The object of degradation is itself,
The word “fuse” comes from the fuse in our daily life. When the load is too high (the current is too high), the fuse will automatically blow to prevent the circuit from being burned out. Many technologies are extracted from life scenes.
The working principle of
Fuses generally have three states:
- Off: Indicates the default state, in which the request can be reached by the target service. The number of successful and failed requests in the window is counted. If the threshold for the error rate is reached, the request is disconnected.
- Off: An error is returned directly from this state, and the fallback method is called directly if there is a fallback configuration.
- Half-off: A supermarket time will be maintained when the offline state is started. When the timeout period is reached, the supermarket time will enter the half-off state. It tries to allow a department to pass requests normally and count the number of successful requests. The purpose of the half-disconnected state is to repair itself and prevent the services that are being restored from being decimated again.
Fuse components used more frequently:
- Hystrix Circuit Breaker (No longer maintained)
- hystrix-go
- Resilience4j (recommended)
- Sentinel (recommended)
What is an adaptive fuse
Based on the fuse principle mentioned above, the following parameters are usually required to make good use of the fuse in the project:
- Error ratio threshold: Enters the disconnected state when the threshold is reached.
- Timeout period: Enters the semi-disconnected state after the timeout period.
- Number of requests allowed in the semi-disconnected state.
- Window time size.
Actually the optional configuration parameters and very, very much, refer to https://resilience4j.readme.io/docs/circuitbreaker
For less experienced developers, there is no guarantee that these Settings will be appropriate.
So is there an adaptive circuit breaker algorithm that allows us to ignore parameters and simply configure it for most scenarios?
There is. Google SRE provides an adaptive circuit breaker algorithm to calculate the probability of discarding a request:
Algorithm parameters:
- Requests: Total number of requests in the window
- Accepts: Normal requested quantity
- K: sensitivity. The smaller K is, the easier it is to lose requests. Generally, it is recommended to be between 1.5-2
Algorithm explanation:
- The normal situation is that requests=accepts, so the probability is 0.
- As the number of normal requests decreases, the probability P will gradually be greater than 0 as the number of requests == K* continues, and the probability will gradually drop some of the requests, if the failures are serious the packets will be lost more and more, and if the window time accepts==0 the circuit is completely blown.
- As the application gradually returns to normal, the accepts and the requests are increasing at the same time, but the K*accepts will increase faster than the requests, so the probability will soon go to zero, closing the fuse.
Code implementation
Next, consider how a fuse might be implemented.
The initial ideas are as follows:
- No matter what fuses have to rely on the indicator statistics to change the state, and the statistics generally require that the data is the most recent period of time (too long data has no reference significance and waste space), so a sliding time window data structure is usually used to store the statistics. At the same time, the state of fuse also needs to rely on indicator statistics to achieve observability. The first step for us to realize any system is to consider observability, otherwise the system is a black box.
- External service requests have a variety of results, so you need to provide a custom method to determine whether the request was successful. It could be http.code, rpc.code, body.code. The fuse needs to collect this data in real time.
- When external services are fusing, users often need to customize the logic for fast failure. Consider providing custom fallback() functionality.
Here is a step-by-step analysis of the source code implementation of Go-Zero:
core/breaker/breaker.go
Fuse interface definition
Without moving, we can start to plan and define interfaces after we have defined the requirements. Interfaces are the first and most important step in our coding thinking abstraction.
The core definition contains two types of methods:
Allow() : requires manual callback to the fuse.
DoXXX() : automatically calls back the result of a request to a fuse, which is equivalent to an automatic transmission. In fact, DoXXX() type methods end up being called
DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error
// Customize the execution result of the decision
Acceptable func(err error) bool
// Manual callback
Promise interface {
// Accept tells the Breaker that the call is successful.
// The request succeeded
Accept()
// Reject tells the Breaker that the call is failed.
// The request failed
Reject(reason string)
}
Breaker interface {
// Fuse name
Name() string
// The result of executing the request must be reported manually
// It is suitable for simple scenarios where quick failure is not required and the result of a request is not required to be customized
// Equivalent to manual gear...
Allow() (Promise, error)
// Automatic reporting of execution results
// Automatic gear...
Do(req func(a) error) error
// Fuse method
// Acceptable - Supports custom decision execution results
DoWithAcceptable(req func(a) error.acceptable Acceptable) error
// Fuse method
//fallback - Supports custom fast failure
DoWithFallback(req func(a) error.fallback func(err error) error) error
// Fuse method
//fallback - Supports custom fast failure
// Acceptable - Supports custom decision execution results
DoWithFallbackAcceptable(req func(a) error.fallback func(err error) error.acceptable Acceptable) error
}
Copy the code
Fuse realization
CircuitBreaker is throttle, which is essentially a static proxy. Proxy mode can enhance functionality without changing the original object. As we’ll see later, the reason go-Zero does this is to collect fuse error data, which is to achieve observability.
Fuses are implemented in static proxy mode, which seems a bit convoluted.
// Fuse structure
circuitBreaker struct {
name string
// In fact, the circuitBreaker is delegated to throttle
throttle
}
// Fuse interface
throttle interface {
// Fuse method
allow() (Promise, error)
// Fuse method
// the DoXXX() method ends up with that method
doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error
}
func (cb *circuitBreaker) Allow(a) (Promise, error) {
return cb.throttle.allow()
}
func (cb *circuitBreaker) Do(req func(a) error) error {
return cb.throttle.doReq(req, nil, defaultAcceptable)
}
func (cb *circuitBreaker) DoWithAcceptable(req func(a) error.acceptable Acceptable) error {
return cb.throttle.doReq(req, nil, acceptable)
}
func (cb *circuitBreaker) DoWithFallback(req func(a) error.fallback func(err error) error) error {
return cb.throttle.doReq(req, fallback, defaultAcceptable)
}
func (cb *circuitBreaker) DoWithFallbackAcceptable(req func(a) error.fallback func(err error) error.
acceptable Acceptable) error {
return cb.throttle.doReq(req, fallback, acceptable)
}
Copy the code
Throttle interface implementation class:
LoggedThrottle added a scrolling window to collect error logs when a request fails.
// Fuse with log function
type loggedThrottle struct {
/ / name
name string
// Proxy object
internalThrottle
// Scroll window, scroll to collect data, equivalent to circular array
errWin *errorWindow
}
// Fuse method
func (lt loggedThrottle) allow(a) (Promise, error) {
promise, err := lt.internalThrottle.allow()
return promiseWithReason{
promise: promise,
errWin: lt.errWin,
}, lt.logError(err)
}
// Fuse method
func (lt loggedThrottle) doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error {
return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
accept := acceptable(err)
if! accept { lt.errWin.add(err.Error()) }return accept
}))
}
func (lt loggedThrottle) logError(err error) error {
if err == ErrServiceUnavailable {
// if circuit open, not possible to have empty error window
stat.Report(fmt.Sprintf(
"proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
}
return err
}
Copy the code
Error log collection errorWindow
The errorWindow is a circular array where new data is constantly scrolled over the oldest data, implemented by residuals.
// Scroll window
type errorWindow struct {
reasons [numHistoryReasons]string
index int
count int
lock sync.Mutex
}
// Add data
func (ew *errorWindow) add(reason string) {
ew.lock.Lock()
// Add an error log
ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
// Update index in preparation for the next write
// The function of scrolling is implemented
ew.index = (ew.index + 1) % numHistoryReasons
// Count the quantity
ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
ew.lock.Unlock()
}
// Format error log
func (ew *errorWindow) String(a) string {
var reasons []string
ew.lock.Lock()
// reverse order
for i := ew.index - 1; i >= ew.index-ew.count; i-- {
reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
}
ew.lock.Unlock()
return strings.Join(reasons, "\n")}Copy the code
We haven’t seen the actual fuse implementation yet, but the actual fuse operation is actually delegated to the internalThrottle object.
internalThrottle interface {
allow() (internalPromise, error)
doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error
}
Copy the code
The internalThrottle interface implements the googleBreaker structure definition
type googleBreaker struct {
// Sensitivity, the default value in Go-Zero is 1.5
k float64
// A sliding window is used to record the total number of requests and successes in a recent period of time
stat *collection.RollingWindow
// Probability generator
// Generate a random number between 0.0 and 1.0
proba *mathx.Proba
}
Copy the code
It can be seen that the fuse attribute is actually very simple, and the data statistics is realized by sliding time window.
RollingWindow Sliding window
Sliding window is a general data structure, which is often used for the statistics of behavior data in a recent period of time.
The implementation is very interesting, especially how to simulate the window-sliding process.
First, let’s look at the structure definition of sliding window:
RollingWindow struct {
/ / the mutex
lock sync.RWMutex
// Number of sliding Windows
size int
// Window, data container
win *window
// Slide window unit time interval
interval time.Duration
// cursor, used to locate which bucket should be written currently
offset int
// Whether to ignore the data being written to the bucket when summarizing data
// In some scenarios the bucket data is being written without a full window interval
// The current bucket statistics may be inaccurate
ignoreCurrent bool
// The last time to write to the bucket
// It is used to calculate the interval between the next data write
// How many time intervals have passed
lastTime time.Duration
}
Copy the code
The window is the actual location where the data is stored, which is essentially an array for adding data to and clearing offset. The array is divided into buckets at internal intervals.
// Time window
type window struct {
/ / barrel
// A bucket identifies an interval
buckets []*Bucket
// Window size
size int
}
// Add data
//offset - cursor to locate write bucket position
//v - Behavior data
func (w *window) add(offset int, v float64) {
w.buckets[offset%w.size].add(v)
}
// Aggregate data
//fn - Custom bucket statistics function
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
for i := 0; i < count; i++ {
fn(w.buckets[(start+i)%w.size])
}
}
// Clear a specific bucket
func (w *window) resetBucket(offset int) {
w.buckets[offset%w.size].reset()
}
/ / barrel
type Bucket struct {
// Sum of current bucket values
Sum float64
// Total number of add times for the bucket
Count int64
}
// Add data to the bucket
func (b *Bucket) add(v float64) {
/ / sum
b.Sum += v
/ / the number of + 1
b.Count++
}
// The bucket data is cleared
func (b *Bucket) reset(a) {
b.Sum = 0
b.Count = 0
}
Copy the code
Window add data:
- Calculate how much time has elapsed since the last time the current time was added, which is essentially a number of buckets out of date.
- Clear the data of expired buckets
- Update offset, the process of updating offset is actually simulating window sliding
- Add data
// Add data
func (rw *RollingWindow) Add(v float64) {
rw.lock.Lock()
defer rw.lock.Unlock()
// Get the subscript of the current write
rw.updateOffset()
// Add data
rw.win.add(rw.offset, v)
}
// Calculate how many units of time elapsed since the last data was written
// How many barrels did you pass
func (rw *RollingWindow) span(a) int {
offset := int(timex.Since(rw.lastTime) / rw.interval)
if 0 <= offset && offset < rw.size {
return offset
}
// Return the window size if it is larger than the time window
return rw.size
}
// Update the current offset
// Implement window sliding
func (rw *RollingWindow) updateOffset(a) {
// Take a span of buckets
span := rw.span()
// Do not need to be updated in the same cell time
if span <= 0 {
return
}
offset := rw.offset
// No data was written after span
// Then the data in these buckets should not be retained, it is expired data can be emptied
// You can see that the % mod operation is used to implement periodic subscripts
// If it exceeds the index, start from the beginning to ensure that the new data can be written properly
// The effect is similar to loop array
for i := 0; i < span; i++ {
rw.win.resetBucket((offset + i + 1) % rw.size)
}
/ / update the offset
rw.offset = (offset + span) % rw.size
now := timex.Now()
// Update the operation time
// This is interesting
rw.lastTime = now - (now-rw.lastTime)%rw.interval
}
Copy the code
Window statistics:
// Summarize the data
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
rw.lock.RLock()
defer rw.lock.RUnlock()
var diff int
span := rw.span()
// The number of unexpired buckets before the current time expiration
if span == 0 && rw.ignoreCurrent {
diff = rw.size - 1
} else {
diff = rw.size - span
}
if diff > 0 {
// The bucket data between rw.offset - rw.offset+span is expired and should not be counted in statistics
offset := (rw.offset + span + 1) % rw.size
// Aggregate data
rw.win.reduce(offset, diff, fn)
}
}
Copy the code
GoogleBreaker determines whether a circuit breaker should be cut
- Collect statistics within the sliding window
- Calculate the fusing probability
// Calculate whether to fuse according to the latest request data
func (b *googleBreaker) accept(a) error {
// Get statistics for the most recent period
accepts, total := b.history()
// Calculate the dynamic fusing probability
weightedAccepts := b.k * float64(accepts)
// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
// Probability 0, pass
if dropRatio <= 0 {
return nil
}
// Generate random numbers between 0.0 and 1.0 compared with the fusing probability calculated above
// If the probability of random number is smaller than fusing, fusing is performed
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
Copy the code
GoogleBreaker fuses logic implementation
There are two types of fuses exposed to the outside
- In simple scenarios, you need to manually report the execution result to the fuse after executing the request.
func (b *googleBreaker) allow() (internalPromise, error)
- In complex scenarios, you can customize a fast failure method to determine whether a request is successful and automatically report the execution result to the fuse.
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
The purpose of the Acceptable parameter is to customize whether the request is successful.
Acceptable func(err error) bool
Copy the code
// Fuse method
// Returns a Promise asynchronous callback object that can be reported to the fuse at the developer's discretion
func (b *googleBreaker) allow(a) (internalPromise, error) {
iferr := b.accept(); err ! =nil {
return nil, err
}
return googlePromise{
b: b,
}, nil
}
// Fuse method
//req - Fuse object method
//fallback - A custom fast failback function that can wrap err generated by fuses and return it
/ / acceptable - this did not perform the requested when fusing results are custom decision, such as for HTTP. Code, RPC. The code, the body. The code
func (b *googleBreaker) doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error {
// Determine whether to fuse
iferr := b.accept(); err ! =nil {
// If there is a custom fallback, execute it
iffallback ! =nil {
return fallback(err)
}
return err
}
// If panic occurs during req(), the system still determines that the req fails and reports it to the fuse
defer func(a) {
if e := recover(a); e ! =nil {
b.markFailure()
panic(e)
}
}()
// Execute the request
err := req()
// Determine that the request was successful
if acceptable(err) {
b.markSuccess()
} else {
b.markFailure()
}
return err
}
// Report success
func (b *googleBreaker) markSuccess(a) {
b.stat.Add(1)}// Report failure
func (b *googleBreaker) markFailure(a) {
b.stat.Add(0)}// Statistics
func (b *googleBreaker) history(a) (accepts, total int64) {
b.stat.Reduce(func(b *collection.Bucket) {
accepts += int64(b.Sum)
total += b.Count
})
return
}
Copy the code
data
Microsoft Azure on fuse design patterns
SONY referred to Microsoft’s documentation for an open source fuse implementation
Go-zero adaptive fuse documentation