Distributed three powerful tools

With the development and expansion of business, the pressure on back-end services will be more and more. In order to create an efficient and stable system, distributed, micro-service and other system designs are produced. Because of this reason, the design complexity also increases.

Flow limiting: considering the flow inlet of the system, limit the flow entering to protect the system;

Cache: Caches data in the database to improve system access speed and concurrency and protect database resources.

Downgrade and circuit breaker are similar in that they both belong to overload protection mechanism, but they have the following differences in implementation

Degrade: Considering the level services or services inside the system, some non-important services, such as log collection, can be suspended or delayed to protect the normal use of other services.

Fusing: When a service is overloaded, to prevent system failure, the service is shut down or some requests are successful while others fail. For example, if the number of consecutive request failures reaches 20 within 10 seconds, the circuit breaker is triggered and 60% of the requests are filtered.

The necessity of limiting traffic

This article focuses on the first tool for limiting traffic, and a more detailed article on caching and degradation will follow.

To ensure the smooth running of the system, traffic limiting for certain requests must be selectively implemented based on preset traffic limiting rules when the traffic exceeds the expected value.

In a distributed system, the invocation of its interfaces comes from multiple systems. Some callers may have a sudden increase in the number of requests and compete for server resources, while the interface requests from other callers have no time to respond and wait in line. The overall response time of microservices becomes longer or even times out. So to prevent the interface from being over-invoked, fine-grained access flow limiting is required for each caller.

Current limiting scheme

Traffic limiting at the gateway layer

The figure above shows a request traffic funnel model, executed in order as follows

  1. User requests are routed through the gateway to the service layer
  2. The service layer retrieves cached data for each request
  3. No cache hit, request data directly

As the entrance of the service, the gateway undertakes all the traffic of the whole system and is the source of the whole access link. It plays the role of “Yifu guan, Wanfu Mo open”, so it is the most suitable way to do traffic limiting.

In addition, the introduction of gateway layer can also solve the following problems in distributed system:

  • The client requests different microservices multiple times, increasing the complexity of the client
  • Cross-domain requests exist and are complicated to process in certain scenarios
  • Authentication is complex and each service requires independent authentication
  • It is difficult to refactor, and as the project iterations, microservices may need to be reclassified. For example, you might combine multiple services into one or split a service into multiple services. If the client communicates directly with the microservice,

Refactoring will be difficult to implement

  • Some microservices may use firewall/browser-unfriendly protocols, making direct access difficult
  • The request is malicious and has security problems

The current mainstream Gateway layers include Nginx, represented by software, Gateway components like Gateway and Zuul in Spring Cloud, and F5, represented by hardware + software (F5 is incredibly expensive). In addition, there are many components developed by big manufacturers, such as Tencent Rio.

Middleware current limiting

When it comes to middleware, the first thing that comes to mind is Message queue (MQ). As the name implies, Message queue is a “queue” storing “messages”, with FIFO features. The “message” here actually means “data,” so the message queue itself is a simple data structure — a queue.

In terms of design patterns,Message queues are a classic implementation of the producer-consumer pattern. In general, the user is the “producer” of the request, and the background service is the “consumer” of the request. Kafka, RabbitMQ, RocketMQ and so on are common MQ in the current system, which can achieve the role of asynchronous decoupling, peak cutting and valley filling.

algorithm

There are three common traffic limiting algorithms, namely, counter, leaky bucket and token bucket.

Counter (fixed window)

The counter algorithm is the simplest and easiest algorithm to implement in the current limiting algorithm. For example, for interface A, the number of accesses per minute cannot exceed 100. If the value of counter is greater than 100 and the interval between the request and the first request is less than 1 minute, then there are too many requests. If the interval between the request and the first request is greater than 1 minute and the value of counter is still within the flow limit, reset counter.

In the counter technical problems, we should pay attention to atomicity, prevent concurrency problems, resulting in inaccurate counting. The code design is as follows:

// CounterLimit counter
type CounterLimit struct {
   counter      int64 / / counter
   limit        int64 // Specifies the maximum number of requests allowed in a time window
   intervalNano int64 // Specifies a time window
   unixNano     int64 // Unix timestamp, in nanoseconds
}

// NewCounterLimit is initialized
func NewCounterLimit(interval time.Duration, limit int64) *CounterLimit {

   return &CounterLimit{
   	counter:      0,
   	limit:        limit,
   	intervalNano: int64(interval),
   	unixNano:     time.Now().UnixNano(),
   }
}

// Allow Determines whether requests are allowed in the current time window
func (c *CounterLimit) Allow(a) bool {

   now := time.Now().UnixNano()
   if now-c.unixNano > c.intervalNano { // If the current time window has passed, the count is redone
   	atomic.StoreInt64(&c.counter, 0)
   	atomic.StoreInt64(&c.unixNano, now)
   	return true
   }

   atomic.AddInt64(&c.counter, 1)
   return c.counter < c.limit // Determine whether to perform current limiting
}
Copy the code

The sliding window

Sliding window fixed window optimized version, mainly to solve critical problems.

As shown in the figure above, if at 0:59, 100 requests are received instantaneously, and at 1:00, 100 requests are received instantaneously, then the service actually receives 200 requests in 1 second. We just specified a maximum of 100 requests per minute, which is 1.7 requests per second, and by popping requests at the reset node in the time window, users can instantly exceed our rate limit and overwhelm our application.

The Sliding Window Protocol, an application of TCP, is used for traffic control during network data transmission to avoid congestion. This protocol allows the sender to send multiple packets of data before stopping and waiting for confirmation. Because the sender does not have to stop for confirmation every time a packet is sent. Therefore, the protocol can speed up data transmission and improve network throughput.

The sliding window counter avoids the double burst request caused by the fixed window counter by subdividing the window and “sliding” according to time, but the higher the accuracy of the time interval, the more space required by the algorithm.

In the figure above, the entire red rectangle represents a time window, that is, one minute. Then we divide the time window into six squares, each representing 10 seconds. Every 10 seconds, our time window slides one space to the right. Each grid has its own counter, and if a request arrives at 0:25, the counter from 0:20 to 0:29 is incrementing by one.

How does sliding Windows solve that critical problem?

The 100 requests arriving at 0:59 will land in the gray grid, while the requests arriving at 1:00 will land in the orange grid. When the time reaches 1:00, our window will move one space to the right, so the total number of requests in the time window at this time is 200, exceeding the limit of 100, so it can be detected that the traffic limiting is triggered at this time.

The more grids the sliding window is divided, the smoother the rolling of the sliding window will be, and the more accurate the statistics of current limiting will be.

var (
	limitCount  int        = 10 / / 6 s limit frequency
	limitBucket int        = 6  // Number of sliding Windows
	curCount    int32      = 0  // Record the number of frequency limits
	head        *ring.Ring      // Ring queue (linked list)
)

func main(a) {
	tcpAddr, err := net.ResolveTCPAddr("tcp4"."0.0.0.0:9090") // Get a tcpAddr
	checkError(err)
	listener, err := net.ListenTCP("tcp", tcpAddr) // Listen on a port
	checkError(err)
	defer listener.Close()
	// Initialize the sliding window
	head = ring.New(limitBucket)
	for i := 0; i < limitBucket; i++ {
		head.Value = 0
		head = head.Next()
	}
	// Start the actuator
	go func(a) {
		timer := time.NewTicker(time.Second * 1)
		for range timer.C { // Refresh the sliding window data every second
			subCount := int32(0 - head.Value.(int))
			newCount := atomic.AddInt32(&curCount, subCount)

			arr := [6]int{}
			for i := 0; i < limitBucket; i++ { // This is for easy printing
				arr[i] = head.Value.(int)
				head = head.Next()
			}
			fmt.Println("move subCount,newCount,arr", subCount, newCount, arr)
			head.Value = 0
			head = head.Next()
		}
	}()

	for {
		conn, err := listener.Accept() // Block here and run handle down one request at a time
		iferr ! =nil {
			fmt.Println(err)
			continue
		}
		go handle(&conn) // Start a single coroutine processing, how many requests, how many coroutines, coroutines share the same global variable limiting, atomic operation on it.}}func handle(conn *net.Conn) {
	defer (*conn).Close()
	n := atomic.AddInt32(&curCount, 1)
	//fmt.Println("handler n:", n)
	if n > int32(limitCount) { // The frequency limit is exceeded
		atomic.AddInt32(&curCount, - 1) // add 1 by atomic
		(*conn).Write([]byte("HTTP/1.1 404 NOT FOUND\r\n\r\nError, too many request, please try again"))}else {
		mu := sync.Mutex{}
		mu.Lock()
		pos := head.Prev()
		val := pos.Value.(int)
		val++
		pos.Value = val
		mu.Unlock()
		time.Sleep(1 * time.Second)                                             // Suppose our application takes 1s to process business
		(*conn).Write([]byte("HTTP/1.1 200 OK\r\n\r\nI can change the world!")) // After the service processing is complete, 200 is returned successfully.}}// Handle the exception
func checkError(err error) {
	iferr ! =nil {
		fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
		os.Exit(1)}}Copy the code

Because the sliding window is difficult to understand, the following pressure log is printed, and the pressure tool address is github.com/rakyll/hey

$ ./hey -c 6 -n 300 -q 6 -t 80 http://localhost:9090 move subCount,newCount,arr 0 0 [0 0 0 0 0 0] move subCount,newCount,arr 0 0 [0 0 0 0 0 0] move subCount,newCount,arr 0 6 [0 0 0 0 0 6] move subCount,newCount,arr 0 10 [0 0 0 0 6 4] move subCount,newCount,arr 0 10 [0 0 0 6 4 0] move subCount,newCount,arr 0 10 [0 0 6 4 0 0] move subCount,newCount,arr 0 10 [0 6 4 0 0 0] move subCount,newCount,arr -6 4 [6 4 0 0 0 0] move subCount,newCount,arr -4 6 [4 0 0 0 0 6] move subCount,newCount,arr 0 10 [0 0 0 0 6 4] move subCount,newCount,arr 0 10 [0 0 0 6 4 0] move subCount,newCount,arr 0 10 [0 0 6 4 0 0] move subCount,newCount,arr 0 10 [0 6 4 0 0 0] move subCount,newCount,arr -6 4 [6 4 0 0 0 0] move subCount,newCount,arr -4 3 [4 0 0 0 0 3] move subCount,newCount,arr 0 3 [0 0 0 0 3 0] move subCount,newCount,arr 0 3 [0 0 0 3 0 0] move subCount,newCount,arr 0 3 [0 0 3 0 0 0] move subCount,newCount,arr 0 3 [0 3  0 0 0 0] move subCount,newCount,arr -3 0 [3 0 0 0 0 0] move subCount,newCount,arr 0 0 [0 0 0 0 0 0] move subCount,newCount,arr 0 0 [0 0 0 0 0 0]Copy the code

bucket

The concepts of leaky bucket algorithm are as follows:

  • Store each request as a “drip” in a “leaky bucket”;
  • The “leaky bucket” is executed at a fixed rate. If the “leaky bucket” is empty, the “leaky bucket” stops.
  • If the bucket is full, excess water drops are discarded.
// BucketLimit leaky bucket structure
type BucketLimit struct {
	rate       float64 // The rate at which water leaks out of the tank
	bucketSize float64 // The maximum size of the drain bucket can hold water
	unixNano   int64   / / Unix timestamp
	curWater   float64 // The current bucket of water
}

NewBucketLimit is initialized
func NewBucketLimit(rate float64, bucketSize int64) *BucketLimit {
	return &BucketLimit{
		bucketSize: float64(bucketSize),
		rate:       rate,
		unixNano:   time.Now().UnixNano(),
		curWater:   0,}}// refresh Updates the current bucket capacity
func (b *BucketLimit) refresh(a) {
	now := time.Now().UnixNano()
	// Time difference, change nanoseconds to seconds
	diffSec := float64(now-b.unixNano) / 1000 / 1000 / 1000
	b.curWater = math.Max(0, b.curWater-diffSec*b.rate)
	b.unixNano = now
	return
}

// Allow Indicates whether the bucket capacity is exceeded
func (b *BucketLimit) Allow(a) bool {
	b.refresh()
	if b.curWater < b.bucketSize {
		b.curWater = b.curWater + 1
		return true
	}

	return false
}
Copy the code

The leak-bucket algorithm also has obvious defects. When there are a large number of sudden requests in a short time, each request has to wait in the queue for a period of time before being responded, even though the server has no load at this time.

The token bucket

In addition to limiting the average data transfer rate, some degree of burst transmission is required for many application scenarios. In this case, the leaky bucket algorithm may not be suitable, but the token bucket algorithm is more suitable. The token bucket algorithm works by putting tokens into the bucket at a constant rate. If the request needs to be processed, a token needs to be fetched from the bucket first. When no token is available in the bucket, the service is denied.

// TokenBucket TokenBucket
type TokenBucket struct {
	rate      int // Token loading speed
	tokenSize int // The token bucket capacity
	curNum    int // Token in the bucket
}

// NewTokenBucket is initialized
func NewTokenBucket(rate, tokenSize int) *TokenBucket {
	return &TokenBucket{
		rate:      rate,
		tokenSize: tokenSize,
	}
}

// PushToken Stores tokens in buckets
func (t *TokenBucket) PushToken(a) chan struct{} {
	tb := make(chan struct{}, t.tokenSize)
	ticker := time.NewTicker(time.Duration(1000) * time.Microsecond)
	// Initialize the token
	for i := 0; i < t.tokenSize; i++ {
		tb <- struct{}{}
	}
	t.curNum = t.tokenSize

	// Add tokens at a specified rate
	go func(a) {
		for {
			for i := 0; i < t.rate; i++ {
				tb <- struct{}{}
			}
			t.curNum += t.rate
			if t.curNum > t.tokenSize {
				t.curNum = t.tokenSize
			}
			<-ticker.C
		}
	}()
	return tb
}

// popToken Indicates the value of the token
func (t *TokenBucket) PopToken(bucket chan struct{}, n int) {
	for i := 0; i < n; i++ {
		_, ok := <-bucket
		if ok {
			t.curNum -= 1
			fmt.Println("get token success")}else {
			fmt.Println("get token fail")}}}Copy the code

Comparison of four algorithms

algorithm To determine the parameters Spatial complexity Time complexity Limiting burst traffic Smooth current limiting Implementation difficulty in distributed environment
Fixed window Counting period T, the maximum number of accesses within the period N Low O(1) (Record the number of access times and start time of a cycle) Low O (1) no no low
The sliding window Counting period T, the maximum number of accesses within the period N High O(N) (Records the number of accesses in each small cycle) In O (N) is Relative implementation. The more grids a sliding window has, the smoother it will scroll In the
bucket Drain bucket flow rate r, drain bucket capacity N Low O(1) (Record the current capacity in the leaking bucket) O (N) is is high
The token bucket Token generation speed r, token bucket capacity N Low O(1) (Record the number of tokens in the current token bucket) O (N) is is high

conclusion

The code mentioned in this article has been uploaded to the code base, see github.com/MerlinFeng/…

In practical system design, token bucket and leaky bucket algorithms are widely used and have different application scenarios respectively.

Token buckets can be used to protect themselves, primarily by limiting the frequency of upstream callers in order not to overwhelm themselves. Therefore, if traffic bursts when the processing capacity is available (the actual consumption capacity is stronger than the configured traffic limit), the actual processing rate can exceed the configured traffic limit.

Leaky bucket algorithm, to protect others, that is, to protect the downstream system he calls. The main scenario is that when the calling third-party system has no protection mechanism itself, or has a traffic limit, we cannot call faster than its limit. Since we cannot change the third-party system, it is only under the control of the caller. At this time, even if the traffic burst, it must be abandoned. Because spending power is determined by a third party.