Flow limiter, as the name suggests, a component used to limit the flow of highly concurrent requests.

Traffic limiting includes traffic limiting at the Nginx level and traffic limiting in business code logic. Traffic limiting is used in many microservices and service meshes. There are three algorithms for current limiting: semaphore, leaky bucket and token bucket. These three algorithms are described in turn.

The author’s program examples in this paper are implemented in Go language.

1. Problem description

Rapid user growth, popular businesses or crawlers and other malicious attacks result in a sudden increase in the number of requests. For example, on the day of checking scores, the number of requests for the educational administration system of the school increases by more than 100 times. Before long, the interface is almost unusable, and a chain reaction leads to the crash of the whole system. How to deal with this situation? Life has given us the answer: old power switches, for example, are fitted with fuses that blow out when someone uses a device with very high power to protect everything from being burned out by a strong current. Similarly, you need to install a fuse on the interface to prevent system breakdown caused by excessive pressure from unexpected requests. When the traffic is too heavy, you can reject or divert traffic.

Due to the complexity and diversity of back-end services, each container may have a single-node bottleneck during container deployment. If the bottleneck exceeds the threshold, memory or CPU bottlenecks may occur, leading to service unavailability or a single-node container directly hangs or restarts.

2. Semaphore current limiting

Semaphores are designed in many development languages. Semaphore in Java, for example, is a counting Semaphore. Commonly used to limit the number of threads accessing a resource, implemented based on Java concurrent and outsource.

Two important semaphore methods Acquire() and Release(). Acquire permissions through the acquire() method, which blocks until permissions are acquired. Release permissions with the release() method.

After reading some open source implementations of languages, the author concludes that there are two main types of semaphores: non-blocking and blocking.

2.1 Blocking Mode

The lock or blocking queue mode is adopted. The Go language is used as an example:

// Use channel as the underlying data structure to achieve blocking access and use semaphorestypeSemaphore struct{innerChan chan struct{}} Func NewSemaphore(num uint64) *Semaphore {return&Semaphore{ innerChan: Make (chan struct{}, num),}} // if (struct{}, num) {// if (struct{}, num); Func (s *Semaphore)Acquire() {
	for {
		select {
		case s.innerChan <- struct{}{}:
			return
		default:
			log.Error("semaphore acquire is blocking") time.sleep (100 * time.millisecond)}} // Acquire semaphores () {// acquire semaphores () { Func (s *Semaphore) func (s *Semaphore)Release() {
	select {
	case <-s.innerChan:
		return
	default:
		return}}Copy the code

In the implementation, a Semaphore structure is defined. Initializing a semaphore is essentially initializing a channel. The initialization size of a channel is the semaphore value. Acquiring a semaphore is essentially putting elements into a channel. If there are many coroutines simultaneously acquiring a semaphore, the channel will be full blocked, so as to achieve the purpose of controlling the number of concurrent coroutines, that is, the control of the semaphore. The essence of releasing semaphore is to acquire elements from channel. Since acquire has elements in it, it must be able to get back to the elements and release them successfully. As long as the purpose of safe programming is default.

2.2 Non-blocking mode

Use concurrent safe counting methods such as atomic addition and subtraction.

3. Traffic limiting algorithm

The mainstream traffic limiting algorithm is divided into two kinds of leaky bucket algorithm and token bucket algorithm, there are many articles and papers about these two algorithms have given detailed explanation. In principle, the token bucket algorithm and the leaky bucket algorithm are opposite, one is water, the other is water. It is worth mentioning that both Google Guava and Uber open source stream limiting components adopt leak-bucket algorithm.

3.1 Leaky bucket algorithm

The Leaky Bucket algorithm is simple. Water enters the Leaky Bucket first and exits the water at a certain rate (the interface has a response rate). If the inflow rate is too high, the water directly spills out (the access frequency exceeds the interface response rate) and then rejects the request. It can be seen that the leaky bucket algorithm can forcibly limit the data transmission rate. The schematic diagram is as follows:

It can be seen that there are two variables here, one is the size of the bucket, which supports the amount of water burst that can be stored when the flow increases suddenly, and the other is the size of the bucket leak (rate).

The leaky bucket algorithm can be implemented by using redis queues. Before sending messages, the producer checks whether the queue length exceeds the threshold. If the queue length exceeds the threshold, the producer discards the message. Consumers fetch messages from the Redis queue at a fixed rate. Redis queue plays the role of a buffer pool here, which plays the role of peak cutting and valley filling and flow shaping.

3.2 Token bucket algorithm

In addition to limiting the average data transfer rate, some degree of burst transmission is required for many application scenarios. In this case, the leaky bucket algorithm may not be suitable, but the token bucket algorithm is more suitable. The token bucket algorithm works by putting tokens into the bucket at a constant rate. If the request needs to be processed, a token needs to be fetched from the bucket first. When no token is available in the bucket, the service is denied. The maximum number of tokens that can be stored in a bucket is the amount of burst transfers allowed.

Put a token this action is ongoing, if the number of tokens in the bucket to limit, discarding the token, so there is this kind of situation, ladle has been a large number of tokens are available, and then the incoming request can get directly to the token, such as setting up QPS for 100, so after completing a second current limiter is initialized, it has been 100 barrels token, When the service is completed, the current limiter can withstand 100 instantaneous requests. So, the request waits only if there is no token in the bucket, and eventually executes at a rate.

A queue can be prepared to hold tokens, and tokens can be periodically generated from a thread pool to be placed on the queue, fetched from the queue on each request, and continued execution.

3.3 Realization of leaky bucket algorithm

So here the author comes straight to the point and directly shows the implementation of the Go language version of this algorithm, with the code as follows:

Func (sp *servicePanel) incLimit() error {// Return an error if the limit is greater than the limitif sp.currentLimitCount.Load() > sp.currLimitFunc(nil) {
		return ErrCurrentLimit
	}
	sp.currentLimitCount.Inc()
	return nil
}

func (sp *servicePanel) clearLimit() {// Reset the counter periodically every second to reach the limit of concurrency per second // for example limit 1000req/s, in this case means clean 1000 count value // token bucket is periodically placed, here is the reverse thinking, empty every second, T := time.newticker (time.second)for {
		select {
		case <-t.C:
			sp.currentLimitCount.Store(0)
		}
	}
}
Copy the code

The realization of the actual is relatively rough, not strictly in accordance with according to a fixed rate for each request, but in seconds, to count in the coarse granularity. It can cause a heartbeat double current limiting number per second, although looks don’t meet the requirements, but at this moment is actually just a double value, Any normal system should be able to handle double the number of requests at once.

To improve the

If you want to strictly adhere to a fixed number per request, you can improve the rough strength of the time by doing the following:

Func (sp *servicePanel) incLimit() error {// Returns an error if greater than 1if sp.currentLimitCount.Load() > 1 {
		return ErrCurrentLimit
	}
	sp.currentLimitCount.Inc()
	return nil
}

func (sp *servicePanel) clearLimit() {/ / 1 s per Second divided by the current limiting the number of t: = time, NewTicker (time. The Second/time. Duration (sp) currLimitFunc (nil)))for {
		select {
		case <-t.C:
			sp.currentLimitCount.Store(0)
		}
	}
}
Copy the code

Readers can try out the improved funnel algorithm for themselves.

4. In-depth analysis of Uber open source implementation RateLimit

Uber has opened source ratelimit, a Go library for limiting traffic, on Github, which is based on Leaky Bucket.

4.1 Introduction Mode

# first editionGo get github.com/uber-go/[email protected]# Improved version
go get github.com/uber-go/ratelimit@master
Copy the code

4.2 the use of

First of all, the biggest difference with the author’s current limiter is that it is a blocking flow limiter component for the caller. Traffic limiting rate is generally expressed as rate/s, that is, rate requests per second. Without further ado, here’s an example of usage:

func ExampleRatelimit() {
	rl := ratelimit.New(100) // per second

	prev := time.Now()
	for i := 0; i < 10; i++ {
		now := rl.Take()
		if i > 0 {
			fmt.Println(i, now.Sub(prev))
		}
		prev = now
	}
}
Copy the code

The expected results are as follows:

	// Output:
	// 1 10ms
	// 2 10ms
	// 3 10ms
	// 4 10ms
	// 5 10ms
	// 6 10ms
	// 7 10ms
	// 8 10ms
	// 9 10ms
Copy the code

The test results were exactly as expected. In this example, we are given a flow limiter that can pass 100 requests per second, or an average of 10ms per request interval. Therefore, you end up printing one line of data every 10ms.

4.3 Implementation Details

Construct a current limiter

The first is to construct a Limiter that has a perRequest in it. This is a key variable that represents the interval between each request. This is the core idea of the algorithm of this component. The interval of each request is 1s/rate, which never achieves the concept of rate requests within 1s, so as to achieve the purpose of traffic limiting.

// New returns a Limiter that will limitto the given RPS. func New(rate int, opts ... Option) Limiter { l := &limiter{ perRequest: time.Second / time.Duration(rate), maxSlack: -10 * time.Second / time.Duration(rate), }for _, opt := range opts {
		opt(l)
	}
	if l.clock == nil {
		l.clock = clock.New()
	}
	return l
}
Copy the code
The limiter Take() blocking method

The Take() method is used before each request to get the approval to return to the approval moment.

The first version

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
	t.Lock()
	defer t.Unlock()

	now := t.clock.Now()

	// If this is our first request, then we allow it.
	if t.last.IsZero() {
		t.last = now
		return t.last
	}

	// sleepFor calculates how much time we should sleep based on
	// the perRequest budget and how long the last request took.
	// Since the request may take longer than the budget, this number
	// can get negative, and is summed across requests.
	t.sleepFor += t.perRequest - now.Sub(t.last)

	// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if t.sleepFor < t.maxSlack { t.sleepFor = t.maxSlack } // If sleepFor is positive, then we should sleep now. if t.sleepFor > 0 { t.clock.Sleep(t.sleepFor) t.last = now.Add(t.sleepFor) t.sleepFor = 0 } else { t.last = now } return t.last }Copy the code

In terms of implementation, it can be seen that the first version adopts Go lock, and then queues up for sleep. After sleep, the interval time between requests is constant, and there is a set number of requests in unit time, so as to achieve the purpose of traffic limiting.

The second version

// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *limiter) Take() time.Time {
	newState := state{}
	taken := false
	for! taken { now := t.clock.Now() previousStatePointer := atomic.LoadPointer(&t.state) oldState := (*state)(previousStatePointer) newState = state{} newState.last = now // If this is our first request,then we allow it.
		if oldState.last.IsZero() {
			taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
			continue
		}

		// sleepFor calculates how much time we should sleep based on
		// the perRequest budget and how long the last request took.
		// Since the request may take longer than the budget, this number
		// can get negative, and is summed across requests.
		newState.sleepFor += t.perRequest - now.Sub(oldState.last)
		// We shouldn't allow sleepFor to get too negative, since it would mean that // a service that slowed down a lot for a short period of time would get // a much higher RPS following that. if newState.sleepFor < t.maxSlack { newState.sleepFor = t.maxSlack } if newState.sleepFor > 0 { newState.last = newState.last.Add(newState.sleepFor) } taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState)) } t.clock.Sleep(newState.sleepFor) return newState.last }Copy the code

Version 2 uses atomic +for spin operation instead of lock operation to reduce coroutine lock contention. Both versions, whether they use locks or atomic operations, essentially queue requests, the first version has lock contention, and then queue to sleep, the second version avoids lock contention, but all coroutines might jump out of the for loop very quickly and then all sleep at sleep.

5. Summary

Three powerful tools to ensure service stability: fusible downgrading, service current limiting and fault simulation. This paper mainly explains the common strategy of high availability in distributed system: traffic limiting. Restricted circulation is often implemented in three ways: semaphore (counter), leaky bucket, and token bucket. This paper implements a small plug – in based on leaky bucket algorithm. Finally, the open source Uber-Go of Uber is analyzed, and the second version of the Take() blocking method of the flow limiter is more friendly to the competition of coroutine lock.

Quality book recommendation

Welcome to buy my book, now published:

Original is not easy, I hope you support, looking forward to exchange and learning with you.

reference

High concurrency system flow limiting – leakage bucket algorithm and token bucket algorithm