Why do we need fuses

In a microservice cluster, each application basically depends on a certain number of external services. It is possible to encounter slow network connection, timeout, dependent service overload and service unavailability at any time. In high concurrency scenarios, if the caller does not do anything at this time and continues to request the faulty service, it is easy to cause an avalanche of the entire microservice cluster. For example, in high-concurrency scenarios, users order services, which generally rely on the following services:

  1. Goods and services
  2. Account service
  3. The inventory service

If the account service is overloaded at this time, the account service can only passively wait for the account service to report an error or request timeout, resulting in a large number of order requests, these invalid requests will still occupy system resources: CPU, memory, data connection… The order service is unavailable. Even if the account service is restored the order service cannot recover itself.

At this time, if there is an active protection mechanism to deal with such a scenario, the order service can at least guarantee its own running status, and the order service can also synchronize self-recovery when the account service is restored. This self-protection mechanism is called circuit breaker mechanism in service governance.

fusing

A circuit breaker is a mechanism by which the caller protects itself (and objectively protects the called party), and the object of the circuit breaker is an external service.

demotion

Degradation is the called party’s (service provider’s) self-protection mechanism against overload due to its own lack of resources. The object of degradation is itself,

The word “fuse” comes from the fuse in our daily life. When the load is too high (the current is too high), the fuse will automatically blow to prevent the circuit from being burned out. Many technologies are extracted from life scenes.

The working principle of

Fuses generally have three states:

  1. Off: Indicates the default state, in which the request can be reached by the target service. The number of successful and failed requests in the window is counted. If the threshold for the error rate is reached, the request is disconnected.
  2. Off: An error is returned directly from this state, and the fallback method is called directly if there is a fallback configuration.
  3. Half-off: A supermarket time will be maintained when the offline state is started. When the timeout period is reached, the supermarket time will enter the half-off state. It tries to allow a department to pass requests normally and count the number of successful requests. The purpose of the half-disconnected state is to repair itself and prevent the services that are being restored from being decimated again.

Fuse components used more frequently:

  1. Hystrix Circuit Breaker (No longer maintained)
  2. hystrix-go
  3. Resilience4j (recommended)
  4. Sentinel (recommended)

What is an adaptive fuse

Based on the fuse principle mentioned above, the following parameters are usually required to make good use of the fuse in the project:

  1. Error ratio threshold: Enters the disconnected state when the threshold is reached.
  2. Timeout period: Enters the semi-disconnected state after the timeout period.
  3. Number of requests allowed in the semi-disconnected state.
  4. Window time size.

Actually the optional configuration parameters and very, very much, refer to https://resilience4j.readme.io/docs/circuitbreaker

For less experienced developers, there is no guarantee that these Settings will be appropriate.

So is there an adaptive circuit breaker algorithm that allows us to ignore parameters and simply configure it for most scenarios?

There is. Google SRE provides an adaptive circuit breaker algorithm to calculate the probability of discarding a request:

Algorithm parameters:

  1. Requests: Total number of requests in the window
  2. Accepts: Normal requested quantity
  3. K: sensitivity. The smaller K is, the easier it is to lose requests. Generally, it is recommended to be between 1.5-2

Algorithm explanation:

  1. The normal situation is that requests=accepts, so the probability is 0.
  2. As the number of normal requests decreases, the probability P will gradually be greater than 0 as the number of requests == K* continues, and the probability will gradually drop some of the requests, if the failures are serious the packets will be lost more and more, and if the window time accepts==0 the circuit is completely blown.
  3. As the application gradually returns to normal, the accepts and the requests are increasing at the same time, but the K*accepts will increase faster than the requests, so the probability will soon go to zero, closing the fuse.

Code implementation

Next, consider how a fuse might be implemented.

The initial ideas are as follows:

  1. No matter what fuses have to rely on the indicator statistics to change the state, and the statistics generally require that the data is the most recent period of time (too long data has no reference significance and waste space), so a sliding time window data structure is usually used to store the statistics. At the same time, the state of fuse also needs to rely on indicator statistics to achieve observability. The first step for us to realize any system is to consider observability, otherwise the system is a black box.
  2. External service requests have a variety of results, so you need to provide a custom method to determine whether the request was successful. It could be http.code, rpc.code, body.code. The fuse needs to collect this data in real time.
  3. When external services are fusing, users often need to customize the logic for fast failure. Consider providing custom fallback() functionality.

Here is a step-by-step analysis of the source code implementation of Go-Zero:

core/breaker/breaker.go

Fuse interface definition

Without moving, we can start to plan and define interfaces after we have defined the requirements. Interfaces are the first and most important step in our coding thinking abstraction.

The core definition contains two types of methods:

Allow() : requires manual callback to the fuse.

DoXXX() : automatically calls back the result of a request to a fuse, which is equivalent to an automatic transmission. In fact, DoXXX() type methods end up being called

DoWithFallbackAcceptable(req func() error, fallback func(err error) error, acceptable Acceptable) error

	// Customize the execution result of the decision
	Acceptable func(err error) bool
	
	// Manual callback
	Promise interface {
		// Accept tells the Breaker that the call is successful.
		// The request succeeded
		Accept()
		// Reject tells the Breaker that the call is failed.
		// The request failed
		Reject(reason string)
	}	

	Breaker interface {
		// Fuse name
		Name() string

		// The result of executing the request must be reported manually
		// It is suitable for simple scenarios where quick failure is not required and the result of a request is not required to be customized
		// Equivalent to manual gear...
		Allow() (Promise, error)

		// Automatic reporting of execution results
		// Automatic gear...
		Do(req func(a) error) error

		// Fuse method
		// Acceptable - Supports custom decision execution results
		DoWithAcceptable(req func(a) error.acceptable Acceptable) error

		// Fuse method
		//fallback - Supports custom fast failure
		DoWithFallback(req func(a) error.fallback func(err error) error) error

		// Fuse method
		//fallback - Supports custom fast failure
		// Acceptable - Supports custom decision execution results
		DoWithFallbackAcceptable(req func(a) error.fallback func(err error) error.acceptable Acceptable) error
	}

Copy the code

Fuse realization

CircuitBreaker is throttle, which is essentially a static proxy. Proxy mode can enhance functionality without changing the original object. As we’ll see later, the reason go-Zero does this is to collect fuse error data, which is to achieve observability.

Fuses are implemented in static proxy mode, which seems a bit convoluted.

	// Fuse structure
	circuitBreaker struct {
		name string
		// In fact, the circuitBreaker is delegated to throttle
		throttle
	}
	// Fuse interface
	throttle interface {
		// Fuse method
		allow() (Promise, error)
		// Fuse method
		// the DoXXX() method ends up with that method
		doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error
	}
	
    func (cb *circuitBreaker) Allow(a) (Promise, error) {
    	return cb.throttle.allow()
    }
    
    func (cb *circuitBreaker) Do(req func(a) error) error {
    	return cb.throttle.doReq(req, nil, defaultAcceptable)
    }
    
    func (cb *circuitBreaker) DoWithAcceptable(req func(a) error.acceptable Acceptable) error {
    	return cb.throttle.doReq(req, nil, acceptable)
    }
    
    func (cb *circuitBreaker) DoWithFallback(req func(a) error.fallback func(err error) error) error {
    	return cb.throttle.doReq(req, fallback, defaultAcceptable)
    }
    
    func (cb *circuitBreaker) DoWithFallbackAcceptable(req func(a) error.fallback func(err error) error.
    	acceptable Acceptable) error {
    	return cb.throttle.doReq(req, fallback, acceptable)
    }	
	
Copy the code

Throttle interface implementation class:

LoggedThrottle added a scrolling window to collect error logs when a request fails.


// Fuse with log function
type loggedThrottle struct {
	/ / name
	name string
	// Proxy object
	internalThrottle
	// Scroll window, scroll to collect data, equivalent to circular array
	errWin *errorWindow
}

// Fuse method
func (lt loggedThrottle) allow(a) (Promise, error) {
	promise, err := lt.internalThrottle.allow()
	return promiseWithReason{
		promise: promise,
		errWin:  lt.errWin,
	}, lt.logError(err)
}

// Fuse method
func (lt loggedThrottle) doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error {
	return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
		accept := acceptable(err)
		if! accept { lt.errWin.add(err.Error()) }return accept
	}))
}

func (lt loggedThrottle) logError(err error) error {
	if err == ErrServiceUnavailable {
		// if circuit open, not possible to have empty error window
		stat.Report(fmt.Sprintf(
			"proc(%s/%d), callee: %s, breaker is open and requests dropped\nlast errors:\n%s",
			proc.ProcessName(), proc.Pid(), lt.name, lt.errWin))
	}

	return err
}
Copy the code

Error log collection errorWindow

The errorWindow is a circular array where new data is constantly scrolled over the oldest data, implemented by residuals.

// Scroll window
type errorWindow struct {
	reasons [numHistoryReasons]string
	index   int
	count   int
	lock    sync.Mutex
}

// Add data
func (ew *errorWindow) add(reason string) {
	ew.lock.Lock()
	// Add an error log
	ew.reasons[ew.index] = fmt.Sprintf("%s %s", timex.Time().Format(timeFormat), reason)
	// Update index in preparation for the next write
	// The function of scrolling is implemented
	ew.index = (ew.index + 1) % numHistoryReasons
	// Count the quantity
	ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
	ew.lock.Unlock()
}

// Format error log
func (ew *errorWindow) String(a) string {
	var reasons []string

	ew.lock.Lock()
	// reverse order
	for i := ew.index - 1; i >= ew.index-ew.count; i-- {
		reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
	}
	ew.lock.Unlock()

	return strings.Join(reasons, "\n")}Copy the code

We haven’t seen the actual fuse implementation yet, but the actual fuse operation is actually delegated to the internalThrottle object.

	internalThrottle interface {
		allow() (internalPromise, error)
		doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error
	}
Copy the code

The internalThrottle interface implements the googleBreaker structure definition

type googleBreaker struct {
	// Sensitivity, the default value in Go-Zero is 1.5
	k float64
	// A sliding window is used to record the total number of requests and successes in a recent period of time
	stat *collection.RollingWindow
	// Probability generator
	// Generate a random number between 0.0 and 1.0
	proba *mathx.Proba
}
Copy the code

It can be seen that the fuse attribute is actually very simple, and the data statistics is realized by sliding time window.

RollingWindow Sliding window

Sliding window is a general data structure, which is often used for the statistics of behavior data in a recent period of time.

The implementation is very interesting, especially how to simulate the window-sliding process.

First, let’s look at the structure definition of sliding window:

	RollingWindow struct {
		/ / the mutex
		lock sync.RWMutex
		// Number of sliding Windows
		size int
		// Window, data container
		win *window
		// Slide window unit time interval
		interval time.Duration
		// cursor, used to locate which bucket should be written currently
		offset int
		// Whether to ignore the data being written to the bucket when summarizing data
		// In some scenarios the bucket data is being written without a full window interval
		// The current bucket statistics may be inaccurate
		ignoreCurrent bool
		// The last time to write to the bucket
		// It is used to calculate the interval between the next data write
		// How many time intervals have passed
		lastTime      time.Duration 
	}
Copy the code

The window is the actual location where the data is stored, which is essentially an array for adding data to and clearing offset. The array is divided into buckets at internal intervals.

// Time window
type window struct {
	/ / barrel
	// A bucket identifies an interval
	buckets []*Bucket
	// Window size
	size int
}

// Add data
//offset - cursor to locate write bucket position
//v - Behavior data
func (w *window) add(offset int, v float64) {
	w.buckets[offset%w.size].add(v)
}

// Aggregate data
//fn - Custom bucket statistics function
func (w *window) reduce(start, count int, fn func(b *Bucket)) {
	for i := 0; i < count; i++ {
		fn(w.buckets[(start+i)%w.size])
	}
}

// Clear a specific bucket
func (w *window) resetBucket(offset int) {
	w.buckets[offset%w.size].reset()
}

/ / barrel
type Bucket struct {
	// Sum of current bucket values
	Sum float64
	// Total number of add times for the bucket
	Count int64
}

// Add data to the bucket
func (b *Bucket) add(v float64) {
	/ / sum
	b.Sum += v
	/ / the number of + 1
	b.Count++
}

// The bucket data is cleared
func (b *Bucket) reset(a) {
	b.Sum = 0
	b.Count = 0
}
Copy the code

Window add data:

  1. Calculate how much time has elapsed since the last time the current time was added, which is essentially a number of buckets out of date.
  2. Clear the data of expired buckets
  3. Update offset, the process of updating offset is actually simulating window sliding
  4. Add data

// Add data
func (rw *RollingWindow) Add(v float64) {
	rw.lock.Lock()
	defer rw.lock.Unlock()
	// Get the subscript of the current write
	rw.updateOffset()
	// Add data
	rw.win.add(rw.offset, v)
}

// Calculate how many units of time elapsed since the last data was written
// How many barrels did you pass
func (rw *RollingWindow) span(a) int {
	offset := int(timex.Since(rw.lastTime) / rw.interval)
	if 0 <= offset && offset < rw.size {
		return offset
	}
	// Return the window size if it is larger than the time window
	return rw.size
}

// Update the current offset
// Implement window sliding
func (rw *RollingWindow) updateOffset(a) {
	// Take a span of buckets
	span := rw.span()
	// Do not need to be updated in the same cell time
	if span <= 0 {
		return
	}
	offset := rw.offset
	// No data was written after span
	// Then the data in these buckets should not be retained, it is expired data can be emptied
	// You can see that the % mod operation is used to implement periodic subscripts
	// If it exceeds the index, start from the beginning to ensure that the new data can be written properly
	// The effect is similar to loop array
	for i := 0; i < span; i++ {
		rw.win.resetBucket((offset + i + 1) % rw.size)
	}
	/ / update the offset
	rw.offset = (offset + span) % rw.size
	now := timex.Now()
	// Update the operation time
	// This is interesting
	rw.lastTime = now - (now-rw.lastTime)%rw.interval
}
Copy the code

Window statistics:

// Summarize the data
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
	rw.lock.RLock()
	defer rw.lock.RUnlock()

	var diff int
	span := rw.span()
	// The number of unexpired buckets before the current time expiration
	if span == 0 && rw.ignoreCurrent {
		diff = rw.size - 1
	} else {
		diff = rw.size - span
	}
	if diff > 0 {
		// The bucket data between rw.offset - rw.offset+span is expired and should not be counted in statistics
		offset := (rw.offset + span + 1) % rw.size
		// Aggregate data
		rw.win.reduce(offset, diff, fn)
	}
}
Copy the code
GoogleBreaker determines whether a circuit breaker should be cut
  1. Collect statistics within the sliding window
  2. Calculate the fusing probability
// Calculate whether to fuse according to the latest request data
func (b *googleBreaker) accept(a) error {
	// Get statistics for the most recent period
	accepts, total := b.history()
	// Calculate the dynamic fusing probability
	weightedAccepts := b.k * float64(accepts)
	// https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
	dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
	// Probability 0, pass
	if dropRatio <= 0 {
		return nil
	}
	// Generate random numbers between 0.0 and 1.0 compared with the fusing probability calculated above
	// If the probability of random number is smaller than fusing, fusing is performed
	if b.proba.TrueOnProba(dropRatio) {
		return ErrServiceUnavailable
	}

	return nil
}
Copy the code
GoogleBreaker fuses logic implementation

There are two types of fuses exposed to the outside

  1. In simple scenarios, you need to manually report the execution result to the fuse after executing the request.

func (b *googleBreaker) allow() (internalPromise, error)

  1. In complex scenarios, you can customize a fast failure method to determine whether a request is successful and automatically report the execution result to the fuse.

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error

The purpose of the Acceptable parameter is to customize whether the request is successful.

Acceptable func(err error) bool
Copy the code
// Fuse method
// Returns a Promise asynchronous callback object that can be reported to the fuse at the developer's discretion
func (b *googleBreaker) allow(a) (internalPromise, error) {
	iferr := b.accept(); err ! =nil {
		return nil, err
	}

	return googlePromise{
		b: b,
	}, nil
}

// Fuse method
//req - Fuse object method
//fallback - A custom fast failback function that can wrap err generated by fuses and return it
/ / acceptable - this did not perform the requested when fusing results are custom decision, such as for HTTP. Code, RPC. The code, the body. The code
func (b *googleBreaker) doReq(req func(a) error.fallback func(err error) error.acceptable Acceptable) error {
	// Determine whether to fuse
	iferr := b.accept(); err ! =nil {
		// If there is a custom fallback, execute it
		iffallback ! =nil {
			return fallback(err)
		}

		return err
	}
	// If panic occurs during req(), the system still determines that the req fails and reports it to the fuse
	defer func(a) {
		if e := recover(a); e ! =nil {
			b.markFailure()
			panic(e)
		}
	}()
	// Execute the request
	err := req()
	// Determine that the request was successful
	if acceptable(err) {
		b.markSuccess()
	} else {
		b.markFailure()
	}

	return err
}

// Report success
func (b *googleBreaker) markSuccess(a) {
	b.stat.Add(1)}// Report failure
func (b *googleBreaker) markFailure(a) {
	b.stat.Add(0)}// Statistics
func (b *googleBreaker) history(a) (accepts, total int64) {
	b.stat.Reduce(func(b *collection.Bucket) {
		accepts += int64(b.Sum)
		total += b.Count
	})

	return
}
Copy the code

data

Microsoft Azure on fuse design patterns

SONY referred to Microsoft’s documentation for an open source fuse implementation

Go-zero adaptive fuse documentation