Go is a common method of limiting traffic, which supports high concurrency scenarios.

The recent project needs to design concurrency control and flow control for the system, so I just take this opportunity to sort out the concurrency control and flow limiting strategy of Go as a whole. Due to the length, this chapter will only sort out the content of flow limiting, and then sort out the content of Go’s concurrency control.

1. Concurrent control flow limiting

Using channel buffering, we can implement concurrency limits. We just need to write something to a buffered channel (whatever we want to write, the content doesn’t matter) while performing concurrency. Let the concurrent Goroutine read out the contents of this channel after execution. The total number of concurrent requests is limited to the buffer size of this channel.

For example, we can use a bool buffered channel as a counter for concurrency limits.

chLimit := make(chan bool.1)

Copy the code

Then, in parallel execution, each time you create a new goroutine, stuff something into the chLimit.

for i, sleeptime := range input {

    chs[i] = make(chan string, 1)

    chLimit <- true

    go limitFunc(chLimit, chs[i], i, sleeptime, timeout)

}

Copy the code

The newly constructed function is executed concurrently with the go keyword. After executing, he will consume one of the chLimit buffers.

limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {

    Run(task_id, sleeptime, timeout, ch)

    <-chLimit

}

Copy the code

This way, when the number of goroutines created reaches the chLimit buffer limit. The main goroutine is suspended and blocked until the execution of the goroutine completes, consuming the data in the chLimit buffer, and the program continues to create new goroutines. Our goal is to limit the number of concurrent requests.

Complete code:

package main

import (

    "fmt"

    "time"

)

func Run(task_id, sleeptime, timeout int, ch chan string) {

    ch_run := make(chan string)

    go run(task_id, sleeptime, ch_run)

    select {

    case re := <-ch_run:

        ch <- re

    case <-time.After(time.Duration(timeout) * time.Second):

        re := fmt.Sprintf("task id %d , timeout", task_id)

        ch <- re

    }

}

func run(task_id, sleeptime int, ch chan string) {

 

    time.Sleep(time.Duration(sleeptime) * time.Second)

    ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)

    return

}

func main() {

    input := []int{3, 2, 1}

    timeout := 2

    chLimit := make(chan bool, 1)

    chs := make([]chan string, len(input))

    limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {

        Run(task_id, sleeptime, timeout, ch)

        <-chLimit

    }

    startTime := time.Now()

    fmt.Println("Multirun start")

    for i, sleeptime := range input {

        chs[i] = make(chan string, 1)

        chLimit <- true

        go limitFunc(chLimit, chs[i], i, sleeptime, timeout)

    }

    for _, ch := range chs {

        fmt.Println(<-ch)

    }

    endTime := time.Now()

    fmt.Printf("Multissh finished. Process time %s. Number of task is %d", endTime.Sub(startTime), len(input))

}

Copy the code

Running results:

Multirun start

task id 0 , timeout

task id 1 , timeout

task id 2 , sleep 1 second

Multissh finished. Process time 5s. Number of task is 3

Copy the code

2. Counter

The counter is one of the simplest traffic limiting algorithms. Its principle is: in a period of time, the request is counted, and compared with the threshold value to determine whether the flow limiting is needed. Once the critical point of time is reached, the counter is cleared. This is like you go to the car, the carriage stipulated how many positions, full will not let the car, otherwise it is overloaded, was caught by the traffic police uncle will be fined, if our system is not a fine thing, may directly collapsed.

Program execution logic:

  • I can set a variable count in the program, and when a request comes in I will add that number plus 1 and record the request time.
  • When the next request comes in, determine whether the count exceeds the set frequency and whether the time of the current request and the time of the first request are within 1 minute.
  • If the number of requests exceeds the specified frequency within 1 minute, subsequent requests are rejected.
  • If the interval between the request and the first request is greater than the count cycle and the count value is within the flow limit, the count is reset.

Code implementation:

package main

import (

    "log"

    "sync"

    "time"

)

type Counter struct {

Rate int // Maximum number of requests allowed in the counting period

Begin time. time // Start time of counting

Duration // Counting period

Count int // Total number of requests received in the count period

    lock  sync.Mutex

}

func (l *Counter) Allow() bool {

    l.lock.Lock()

    defer l.lock.Unlock()

    if l.count == l.rate-1 {

        now := time.Now()

        if now.Sub(l.begin) >= l.cycle {

// If the speed is allowed, reset the counter

            l.Reset(now)

            return true

        } else {

            return false

        }

    } else {

// If the rate limit is not reached, count is increased by 1

        l.count++

        return true

    }

}

func (l *Counter) Set(r int, cycle time.Duration) {

    l.rate = r

    l.begin = time.Now()

    l.cycle = cycle

    l.count = 0

}

func (l *Counter) Reset(t time.Time) {

    l.begin = t

    l.count = 0

}

func main() {

    var wg sync.WaitGroup

    var lr Counter

Lr.set (3, time.second) // A maximum of 3 requests within 1s

    for i := 0; i < 10; i++ {

        wg.Add(1)

        log.Println("Create request :", i)

        go func(i int) {

          if lr.Allow() {

              log.Println("Respond to request :", i)

          }

          wg.Done()

        }(i)

        time.Sleep(200 * time.Millisecond)

    }

    wg.Wait()

}

Copy the code

Output:

2021/02/01 21:16:12 Create request: 0

2021/02/01 21:16:12 Response request: 0

2021/02/01 21:16:12 Create request :1

2021/02/01 21:16:12 Response request :1

2021/02/01 21:16:12 Create request: 2

2021/02/01 21:16:13 Create request: 3

2021/02/01 21:16:13 Create request: 4

2021/02/01 21:16:13 Create request: 5

2021/02/01 21:16:13 Response request: 5

2021/02/01 21:16:13 Create request: 6

2021/02/01 21:16:13 Response request: 6

2021/02/01 21:16:13 Create request: 7

2021/02/01 21:16:13 Response request: 7

2021/02/01 21:16:14 Create request: 8

2021/02/01 21:16:14 Create request: 9

Copy the code

It can be seen that we set one request every 200ms, which is obviously higher than the maximum limit of three requests per second. After running, we find that requests numbered 2, 3, 4, 8 and 9 are discarded, indicating that traffic limiting is successful.

If there is a requirement to allow up to 200 requests per minute for an interface /query, let’s say a user sends 200 requests in the last few milliseconds of 59 seconds. When 59 seconds are up and Counter is cleared, he sends another 200 requests in the next second. This is in line with our design logic, which is also the design defect of the counter method. The system may bear a large number of requests from malicious users, and even break down the system. The big problem with this simple approach is that it doesn’t handle unit time boundaries very well.


To be honest, however, this count refers to the lock, which may not be practical in high concurrency scenarios. I suggest removing the lock and then applying the l.mount ++ logic to atomic counting. This ensures that the increment of L.mount will not be executed by multiple threads at the same time, i.e. limiting traffic by atomic counting.

3. Slide Windows

Sliding window is a critical point of counter defects, the so-called Sliding window is a flow control technology, this word appeared in the TCP protocol. The sliding window divides the fixed time slice and moves with the passage of time. The fixed number of movable grids is counted and the threshold is judged.


In the figure above, we use the red dotted line to represent a time window (one minute). Each time window has 6 cells, each cell is 10 seconds. The time window moves one space to the right every 10 seconds to see the direction of the red arrow. We set an independent Counter for each grid. If a request is accessed at 0:45, we set the Counter of the fifth grid +1 (0:40~0:50). When judging the flow limiting, we need to add up the counts of all grids and compare them with the set frequency.

So how does sliding Windows solve the problem we encountered above? Take a look at the graph below:


When the user sends 200 requests within 0:59 seconds, the counter of the sixth grid will record +200. When the time window moves one second to the right, the counter has already recorded the 200 requests sent by the user. Therefore, if the user sends the request again, the flow limiting will be triggered and the new request will be rejected.

Actually counter is sliding window ah, it’s just only a grid, so want to make more accurate current limiting do only needs to be divided into more of the grid, in order to more accurately we don’t know how much this setting a grid, the grid number affects the precision of the sliding window algorithm, there are the concept of time slice, cannot fundamentally solve the problem of critical point.

4. The bucket

Leaky Bucket is a Leaky Bucket with a fixed capacity that leaks water at a fixed rate. If you have ever used a faucet, you know that when you turn on the tap, water will drip into the bucket, and a leaky bucket refers to a hole under the bucket where water can come out. If the tap is turned on too high, the water will flow too fast, which can cause the bucket to fill up and overflow.


A bucket of fixed capacity, with water coming in and water going out. We can’t predict how much water will flow in, or how fast it will flow. However, for the outflow of water, the bucket can fix the outflow rate (treatment speed), thus achieving flow shaping and flow control effect. Code implementation:

type LeakyBucket struct {

Rate FLOAT64 // Constant water outlet rate per second

Capacity Float64 // Capacity of the bucket

Water Float64 // The current water in the bucket

LastLeakMs INT64 // The time stamp of the last leak is MS

    lock sync.Mutex

}

func (l *LeakyBucket) Allow() bool {

    l.lock.Lock()

    defer l.lock.Unlock()

    now := time.Now().UnixNano() / 1e6

Eclipse := float64((now-L.lastleakms)) * l.ate / 1000 // Execute leak first

L water = l water - eclipse

Math.max (0, l.water) // The bucket is dry

    l.lastLeakMs = now

    if (l.water + 1) < l.capacity {

// Try to add water, and the water is not full

        l.water++

        return true

    } else {

// When the water is full, refuse to add water

        return false

    }

}

func (l *LeakyBucket) Set(r, c float64) {

    l.rate = r

    l.capacity = c

    l.water = 0

    l.lastLeakMs = time.Now().UnixNano() / 1e6

}

Copy the code

Leaky bucket algorithm has the following characteristics:

  • The drain bucket has a fixed capacity, and the rate of outlet water is a constant (outflow request)
  • If the bucket is empty, no water needs to flow
  • Water droplets can flow into the drain bucket at any rate (inflow request)
  • If incoming water droplets exceed the bucket’s capacity, incoming water droplets overflow (new request denied)

The drain bucket limits the constant outflow rate (that is, the outflow rate is a constant value), so the maximum rate is the outflow rate, and the burst flow cannot occur.

5. The token bucket

Token Bucket algorithm is one of the most commonly used algorithms in Traffic Shaping and Rate Limiting. Typically, the token bucket algorithm is used to control the amount of data sent to the network and to allow the delivery of burst data.


We have a fixed bucket with tokens in it. The bucket is empty at first, and the system adds tokens to the bucket at a fixed rate until the bucket is full and any additional requests are discarded. When a request comes in, a token is removed from the bucket, and the request is denied or blocked if the bucket is empty.

Code implementation:

type TokenBucket struct {

Rate int64 // Fixed token input rate, r/s

Capacity INT64 // Capacity of a bucket

Tokens INT64 // Current number of tokens in the bucket

LastTokenSec int64 // Timestamp s of the last token placed in the bucket

 

    lock sync.Mutex



func (l *TokenBucket) Allow() bool {

    l.lock.Lock()

    defer l.lock.Unlock()

    now := time.Now().Unix()

L.tokensec = l.tokensec + (now- L.lasttokensec)* l.tate

    if l.tokens > l.capacity {

        l.tokens = l.capacity

    }

    l.lastTokenSec = now

    if l.tokens > 0 {

// Get the token

        l.tokens--

        return true

    } else {

// If there is no token, reject it

        return false

    }

}

func (l *TokenBucket) Set(r, c int64) {

    l.rate = r

    l.capacity = c

    l.tokens = 0

    l.lastTokenSec = time.Now().Unix()

}

Copy the code

The token bucket has the following characteristics:

  • Tokens are placed into the token bucket at a constant rate
  • A bucket can hold a maximum of B tokens. When the bucket is full, newly added tokens are discarded or rejected
  • If there are less than N tokens in the bucket, the token is not deleted and the request is throttled (discarded or blocked waiting)

The token bucket limits the average inflow rate (allows burst requests, can be processed as long as there are tokens, supports taking 3 tokens at a time, 4 tokens…) , and allows a certain degree of burst traffic, so it is also a very common traffic limiting algorithm.

6. Httpserver frequency limits

When using Golang to write httpServer, you can use the package that has been implemented officially:

import(

    "fmt"

    "net"

    "golang.org/x/net/netutil"

)

func main() {

    l, err := net.Listen("tcp"."127.0.0.1:0")

    iferr ! = nil {

        fmt.Fatalf("Listen: %v", err)

    }

    defer l.Close()

    l = LimitListener(l, max)

    http.Serve(l, http.HandlerFunc())

    //bla bla bla.................

}

Copy the code

The basic idea of the source code is to count the number of connections, through make chan to establish a maximum number of connections to the channel, each accept +1, when close -1. When the maximum number of connections is reached, it waits for free connections to be accepted.

7.Redis message queue traffic limiting

We all know that message queues can be asynchronous, decoupled, and peak-clipped. The message queue of the List type of Redis is used, and the queue is specified with a fixed length. At the beginning, data will be continuously dumped into the queue. When the downstream consumption capacity is insufficient and the queue is full, data from the producer will be discarded directly to achieve flow limiting. In fact, there is a question here, because a channel can also be understood as a queue, so whether a channel can also achieve traffic limiting in this way?

When the message queue is full, the upstream cannot plug data, directly blocked, if you don’t want to block, you will need to determine the length of the queue, if full, discard request directly, so every time queue before the data and will need to determine the queue length, but the length of the channel size is actually don’t know, so from this aspect, A channel should not be able to limit traffic by changing its mode.

8. To summarize

Many of the above contents are copied from the Internet. After sorting them out, the impression will be more profound. The following summarizes the advantages and disadvantages of each type of limiting flow through personal understanding, which is also convenient for future selection.

Concurrent control current limiting:

  • Advantages: By controlling the number of threads of consumers, to control the number of concurrent consumers;
  • Disadvantages: feel this kind of flow limiting scenario good Low, first of all, I have not seen such a use, and then handle the number of data, how much each thread can execute, there is no a certain value, not a good way to limit the flow.

Counter:

  • Advantages: fixed time counting, simple implementation, suitable for not too accurate scene;
  • Disadvantages: The boundary is not well handled, resulting in the current limiting can not be accurately controlled.

Sliding window:

  • Advantages: the fixed time segment, time is more complex than “counter”, suitable for slightly accurate scenes;
  • Disadvantages: The implementation is slightly complex and still does not completely solve the boundary problem of “counters”.

Bucket:

  • Advantages: Can be very good control of consumption frequency;
  • Disadvantages: slightly complex implementation, unit time, can not consume more, feel not too flexible.

The token bucket:

  • Advantages: Can solve the problem of “leaky bucket” can not be flexible consumption, and can avoid excessive consumption, strongly recommended;
  • Disadvantages: Slightly complex implementation, other disadvantages not thought of.

Httpserver frequency limits:

  • Advantages: simple implementation, direct third-party library support, but also a good support for limiting traffic;
  • Cons: IF I need to use limiting, I probably won’t use it.

Redis message queue:

  • Advantages: Applicable to data flow scenarios, often combined with asynchronous and decoupling, support data storage;
  • Disadvantages: it can only be used with Redis, because Redis is a single-process single-thread model for data processing (multi-threading is now supported, but the command execution part is still single-threaded). If you use other message queues, you need to lock to handle concurrency control, which is not worth the loss.

For more articles, please pay attention to the wechat public number “Lou Zai advanced road”, point attention, do not get lost ~~