What is a circuit breaker?

A lot of people ask: What is a circuit breaker?

Encyclopedia explanation:

A Circuit Breaker, also known as an automatic Circuit Breaker, is a measure taken by an exchange to halt trading when stock indexes reach a specified Circuit Breaker to control risk.

Vernacular translation:

You take 1000 pieces to play mahjong, lose every minute, this time to stop for 10 minutes, give you time to go to the ATM to withdraw another 1000 pieces, after withdrawing 500 pieces in two times, if you win twice in a row then you can continue to play, otherwise you need to stop, withdraw money, play in batches, and so on.

  • The truce is to keep the panic from spreading and affecting your judgment
  • Batch play is meant to be gradual and avoid all-in loss

Circuit breakers in service governance:

When a service call is made, if the caller returns an error rate that exceeds a certain threshold or triggers certain policies, subsequent requests will not actually initiate the request, but will return an error directly at the caller.

If you’ve seen the previous article in the Micro-design series:

Article card address

Some of you will notice that fuses and current limiting are similar, but the biggest difference is:

  • Traffic limiting is an overload protection set by the server based on its own capabilities. foreign

  • A circuit breaker is a degrade protection for the calling end. internal

Note: Fusible services are definitely not required on core links. If so, then if the service times out or goes down, the foreground service becomes unusable, which is not a circuit breaker. So, circuit breakers are actually a form of demotion.

Why use circuit breakers?

Services and service dependencies are common in microservices or common system architectures. If the server fails, the caller will continue to request or retry, which makes it easy for the server to fail completely. Moreover, the caller may also fail to provide services due to the large number of requests.

The diagram below:

  • The initial service1minNode: Service C is abnormal and does not respond. Service B tries again and again
  • The initial service2minNode: Because service C does not respond continuously, service B tries again and again. As A result, the thread pool of service B is full and service A tries again and again.
  • The initial service3minNode: Service B does not respond continuously, and service A is unavailable

This is the snowballing service avalanche caused by not activating circuit breakers.


How to design fuse?

Fuse is the middleware that monitors and policy fuses the downstream service quality when the calling end initiates communication with the server end.

The diagram below:

Upstream service A initiates A communication to downstream service B that is first handled by the Breaker middleware.

The Breaker belongs to upstream service A. The Breaker is A protection for the caller itself.

The main process is divided into three steps: Before, Call, and After. The fuse construction will be described in detail below.

Fuse structure

  • The state machine
  • Sliding counter
  • Run three steps

The state machine

The internal state machine of fuse has three states

  • Close The fuse is turned off

    The caller accesses the server normally

  • Open The fuse is turned on

    The fuse blocks the caller’s access to the server

  • The fuse is Half Open

    Release the small traffic of the caller to visit the server and check whether the server is healthy

The diagram below:

Init -> Close The fuse is initialized to Close

Close -> Open The fuse changes from Close to Open when the service provider fails to provide services

The location of service exceptions is defined by the upstream service itself, for example:

  • The server requests Timeout
  • The server requested an Http Code other than 2xx
  • User-defined service range errNo > 0

The circuit breaker policy is also customized, for example:

  • Number of request errors >N
  • Request error ratio >N%
  • Number of consecutive request errors >N

Open -> Half Open The fuse has passed the cooling period and is ready to try to resume service. The state becomes Half Open.

Cooling off period: refusal of any service for a user-defined period of time after the fuse is turned on.

Half Open -> Open When the fuse is Half Open and the service party is found abnormal, the fuse is Open again.

Half Open -> Close When the fuse is Half Open and the recovery condition is met, the fuse turns to Close.

Recovery conditions are customized for the caller, for example:

  • The number of consecutive successes is greater than N
  • The ratio of consecutive successful requests is greater than N%

Sliding counter

The fuses’ fuses and recovery strategies are based on request counts, and a counter exists for each sliding time window.

So: the fuse breaker strategy is triggered by the counter reaching a certain threshold in a certain time window.

The diagram below:

Each node of TimeLine is a time window, and each time window corresponds to a set of counters.

Pay attention to

The window sliding operation not only has the forward time lapse, but also the state machine state flow will actively slide the window.

Run three steps

As mentioned above, the operation mechanism of fuse is mainly divided into three steps:

  • Before

    State machine status check and traffic interception

  • Call

    The proxy requests the target server

  • After

    Counter statistics and status updates are performed based on the Response returned from Call


Source code Demo analysis

Article with source code, arrangement!

Demo address: github.com/xiaoxuz/bre…

Breaker structure
type Breaker struct {
	m            sync.Mutex
	state        State
	Metrics      *Metrics
	Strategy     *BreakStrategyConfig
	HalfMaxCalls int64
	OpenTime     time.Time
	CoolingTime  time.Duration
}
Copy the code
  • mRead-write lock
  • stateBreaker status
  • Metricscounter
  • StrategyFusing strategy
  • HalfMaxCallsMaximum number of requests in the half-open state, which is also the threshold for service recovery
  • OpenTimeFuse opening time
  • CoolingTimeCooling time of fuse opening
The Metrics structure
type Metrics struct {
	MetricsID int64			// Counter ID
	Win       *Window		// Slide the time window
	Norm      *Norm			// Index statistics
}
type Window struct {
	Size      time.Duration	// Window size
	StartTime time.Time			// Window opening time
}
type Norm struct {
	AllCnt            int64	// Total requests
	SuccCnt           int64	/ / success
	FailCnt           int64 / / number of failure
	ContinuousSuccCnt int64 // Number of consecutive successes
	ContinuousFailCnt int64	// Number of consecutive failures
}
Copy the code

The counter is made up of two parts:

  • *WindowSliding time window
  • *NormIndex statistics
Breaker main process
// main
func (b *Breaker) Call(f func(a) (interface{}, error)) (interface{}, error) {
	// lock
	b.m.Lock()
	defer b.m.Unlock()

	// Pre-check
	iferr := b.Before(); err ! =nil {
		return nil, err
	}

	// call
	b.Metrics.Call()
	response, err := f()

	// post-processing
	b.After(err == nil)

	return response, nil
}
Copy the code

Sync.mutex read/write lock controls concurrency by executing Before -> call.f () -> After

Before logic

Front state machine status check and traffic interception

How is it checked and intercepted? First look at the code:

func (b *Breaker) Before(a) error {
	now := time.Now()

	switch b.state {
	case STATE_OPEN:
		// If the cooling period is exceeded, adjust to the half-open state
		if b.OpenTime.Add(b.CoolingTime).Before(now) {
			b.Change(STATE_HALFOPEN, now)
			return nil
		}
		// Service will be denied if the cooling period has not expired
		return ERR_SERVICE_BREAK
		break
	case STATE_HALFOPEN:
		// If the number of requests exceeds the half-open limit, service is denied
		if b.Metrics.Norm.AllCnt >= b.HalfMaxCalls {
			return ERR_SERVICE_BREAK_HALFOPEN
		}
		break
	//case STATE_CLOSED:
	default:
		// If the start time of the time window is smaller than the current time, it belongs to the execution sliding window
		if b.Metrics.Win.StartTime.Before(now) {
			b.Metrics.Restart(now.Add(b.Metrics.Win.Size))
		}
		return nil
	}
	return nil
}
Copy the code

Judge the current state:

  • open

    Check whether the cooling period has passed. If true, adjust to half-open mode. Otherwise reject service and return errors.New(“service break”)

  • ajar

    If the number of requests exceeds the half-open limit, service is denied

  • The closed position

    Determine whether you need to slide the window

Call Target service

The agent can only execute the service request after the Before pre-check has passed.

B.etrics.call () The current counter executes norm.allcnt ++

After After logic
func (b *Breaker) After(response bool) error {
	// The request succeeded
	if true == response {
		// Succ count +1
		b.Metrics.Succ()

		// If the current fuse is half-open and the number of consecutive successes reaches the threshold, then the state machine needs to flow to the closed state
		if b.state == STATE_HALFOPEN && b.Metrics.Norm.ContinuousSuccCnt >= b.HalfMaxCalls {
			b.Change(STATE_CLOSED, time.Now())
		}
	} else {
		// Fail counts +1
		b.Metrics.Fail()

		// If the current fuse is half-open, then the state machine needs to be turned on
		if b.state == STATE_HALFOPEN {
			b.Change(STATE_OPEN, time.Now())
		}

		// If the current fuses are closed, decide whether to transfer the state based on the fuses policy
		if b.state == STATE_CLOSED {
			if b.Strategy.Factory().Adapter(b.Metrics) {
				b.Change(STATE_OPEN, time.Now())
			}
		}
	}
	return nil
}
Copy the code

Input parameter Response bool Indicates whether the target service is abnormal.

The request is successful

B.etrics.succ () Current counter execution

func (m *Metrics) Succ(a) {
	m.Norm.SuccCnt++
	m.Norm.ContinuousSuccCnt++
	m.Norm.ContinuousFailCnt = 0
}
Copy the code

In this case, continuous failures of Failcnt should be counted at 0.

In this case, different states make different decisions:

  • Open state, you can’t go to this logic

  • In Close state, SuccCnt++ is normally recorded

  • In the Half Open state, you need to determine whether the Breaker can be closed and the service restored.

    The recovery policy used by Demo source code is that the number of consecutive successes must reach the configured maximum half-open traffic

    b.Metrics.Norm.ContinuousSuccCnt >= b.HalfMaxCalls

    However, this is not absolute, can play its own ~

The request failed

B.metrics.fail () Current counter execution

func (m *Metrics) Fail(a) {
	m.Norm.FailCnt++
	m.Norm.ContinuousFailCnt++
	m.Norm.ContinuousSuccCnt = 0
}
Copy the code

ContinuousSuccCnt = 0

This is also the case for state flow:

  • Open state, normal record FailCnt++ is good

  • The state machine must be switched to the Open state immediately

  • Close state, judge whether the flow should be in Open state based on the circuit breaker strategy

    The Demo here makes a simple factory mode call for the circuit breaker policy

    // Fuse policy interface
    type BreakStrategyFunc interface {
    	Adapter(metrics *Metrics) bool // Each fuse breaker policy needs to implement Adapter policy adaptation methods
    }
    
    / / factory
    func (bsc BreakStrategyConfig) Factory(a) BreakStrategyFunc {
    	switch bsc.BreakStrategy {
    	case BREAK_STRATEGY_FAILCNT:
    		return &BsFailCnt{&bsc}
    		break
    	case BREAK_STRATEGY_CONTINIUOUSFAILCNT:
    		return &BsContinuousFailCnt{&bsc}
    		break
    	case BREAK_STRATEGY_FAILRATE:
    		return &BsFailRate{&bsc}
    		break
    	default:
    		panic(fmt.Sprintf("unknown break strategy : %d", bsc.BreakStrategy))
    	}
    	return nil
    }
    Copy the code

    There are three strategies:

    • If the number of failures is greater than or equal to N times in a time window, enable the circuit breaker.

      func (bs *BsFailCnt) Adapter(metrics *Metrics) bool {
      	return metrics.Norm.FailCnt >= bs.FailCntThreshold
      }
      Copy the code
    • According to the continuous error count, if the number of consecutive failures is >=N within a time window, the fuse is enabled.

      func (bs *BsContinuousFailCnt) Adapter(metrics *Metrics) bool {
      	return metrics.Norm.ContinuousFailCnt >= bs.ContinuousFailCntThreshold
      }
      Copy the code
    • According to the error ratio, if the error ratio in a time window is greater than or equal to N%, enable the circuit breaker.

      func (bs *BsFailRate) Adapter(metrics *Metrics) bool {
      	rate := float64(metrics.Norm.FailCnt / metrics.Norm.AllCnt)
      	return rate >= bs.FailRate
      }
      Copy the code
Detailed operation of state flow
// State flow
func (b *Breaker) Change(state State, now time.Time) {
	// Switch state
	switch state {
	case STATE_OPEN:
		b.OpenTime = now // Update the fuse opening time
		b.state = state
		// The new window time is after the cooldown is increased
		now = now.Add(b.CoolingTime)
		break
	case STATE_HALFOPEN:
		b.state = state
		now = time.Time{}
	case STATE_CLOSED:
		b.state = state
		// New window time
		now = now.Add(b.Metrics.Win.Size)
	case b.state:
		return
	default:
		return
	}

	// Restart the counter
	b.Metrics.Restart(now)
}
Copy the code

First, keep the principle of sliding the window as long as the state is transferred, and execute b.metrics.restart (now). In order to restart the counter in the code, actually do the following sliding window, reset the statistical index operation.

Secondly, the logic of details is different in different states:

  • Open Updates the container Open time, and the new window start time isNow. Add(b.coolingTime Cooldown time)
  • Half Open has no other behavior
  • Close Sliding window time increases the window intervalnow.Add(b.Metrics.Win.Size)
Go Test
breaker := NewBreaker(Config{
		HalfMaxCalls: 3,
		WindowSize:   2 * time.Second,
		Strategy: &BreakStrategyConfig{
			BreakStrategy:    BREAK_STRATEGY_FAILCNT,
			FailCntThreshold: 1,
		},
		CoolingTime: 5 * time.Second,
	})
	var succHandler = func(cnt int) {
		for i := 0; i < cnt; i++ {
			if _, err := breaker.Call(func(a) (i interface{}, err error) {
				return nil.nil}); err ! =nil {
				fmt.Printf("[%s] SuccCall - %s state:%s \n", time.Now().Format("The 2006-01-02 15:04:05"), err.Error(), breaker.state.Name())
			} else {
				fmt.Printf("[%s] SuccCall - service is ok state:%s \n", time.Now().Format("The 2006-01-02 15:04:05"), breaker.state.Name())
			}
			time.Sleep(1 * time.Second)
		}
	}
	var failHandler = func(cnt int) {
		for i := 0; i < cnt; i++ {
			if _, err := breaker.Call(func(a) (i interface{}, err error) {
				return nil, errors.New("test err")
			}); err != nil {
				fmt.Printf("[%s] FailCall - %s state:%s \n", time.Now().Format("The 2006-01-02 15:04:05"), err.Error(), breaker.state.Name())
			} else {
				fmt.Printf("[%s] FailCall - service is ok state:%s \n", time.Now().Format("The 2006-01-02 15:04:05"), breaker.state.Name())
			}
			time.Sleep(1 * time.Second)
		}
	}
  
  // Test order
	succHandler(5) / / succ 5 times
	failHandler(5) / / fail to 5 times
	succHandler(2) / / two succ
	failHandler(1) / / 1 times
	succHandler(10)/ / succ 10 times

	t.Log("Done")
Copy the code

NewBreaker’s configuration: a half-open maximum of three requests, a time window size of 2 seconds, a cooling off period of 5 seconds, and a fuse policy adoption error of 1.

SuccHandler and failHandler are methods for successful and failed requests, respectively. Sleep 1s is requested each time.

Test Result:

The source address

Demo address: github.com/xiaoxuz/bre…

Call it a day

Call it a day, thank you for your support!