There are three tools you can use to protect a high-concurrency system: caching, degradation, and limiting traffic! In order to ensure the flexibility and stability of the online system during the peak hours, the most effective scheme is service degradation, and traffic limiting is one of the schemes most often adopted by the degraded system.

Here is a recommended open source library github.com/didip/tollb… However, if you want something simple, lightweight, or just want to learn, it’s not difficult to implement your own middleware to handle rate limits. Today we are going to talk about how to achieve their own a limiting middleware

The first step is to install a dependency package that provides the Token bucket (Token bucket algorithm) on which the toolBooth implementation is based

  1. $ go get golang.org/x/time/rate



Ok, let’s look at the implementation of the Demo code:

limit.go

  1. package main
  2. import (
  3. “net/http”
  4. “golang.org/x/time/rate”
  5. )
  6. var limiter = rate.NewLimiter(2, 5)
  7. func limit(next http.Handler) http.Handler {
  8. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request ) {
  9. if limiter.Allow() == false {
  10. http.Error(w, http.StatusText(429 ), http.StatusTooManyRequests)
  11. return
  12. }
  13. next.ServeHTTP(w, r)
  14. })
  15. }



main.go

  1. package main
  2. import (
  3. “net/http”
  4. )
  5. func main() {
  6. mux := http.NewServeMux()
  7. mux.HandleFunc(“/”, okHandler)
  8. // Wrap the servemux with the limit middleware.
  9. http.ListenAndServe(“:4000”, limit( mux))
  10. }
  11. func okHandler(w http.ResponseWriter, r *http.Request) {
  12. w.Write([]byte(“OK” ))
  13. }



 

Rate.newlimiter (rate.newlimiter)

  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package rate provides a rate limiter.
  5. package rate
  6. import (
  7. “fmt”
  8. “math”
  9. “sync”
  10. “time”
  11. “golang.org/x/net/context”
  12. )
  13. // Limit defines the maximum frequency of some events.
  14. // Limit is represented as number of events per second.
  15. // A zero Limit allows no events.
  16. type Limit float64
  17. // Inf is the infinite rate limit; it allows all events (even if burst is zero).
  18. const Inf = Limit(math.MaxFloat64)
  19. // Every converts a minimum time interval between events to a Limit.
  20. func Every(interval time.Duration) Limit {
  21. if interval <= 0 {
  22. return Inf
  23. }
  24. return 1 / Limit(interval.Seconds ())
  25. }
  26. // A Limiter controls how frequently events are allowed to happen.
  27. // It implements a “token bucket” of size b, initially full and refilled
  28. // at rate r tokens per second.
  29. // Informally, in any large enough time interval, the Limiter limits the
  30. // rate to r tokens per second, with a maximum burst size of b events.
  31. // As a special case, if r == Inf (the infinite rate), b is ignored.
  32. // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
  33. //
  34. // The zero value is a valid Limiter, but it will reject all events.
  35. // Use NewLimiter to create non-zero Limiters.
  36. //
  37. // Limiter has three main methods, Allow, Reserve, and Wait.
  38. // Most callers should use Wait.
  39. //
  40. // Each of the three methods consumes a single token.
  41. // They differ in their behavior when no token is available.
  42. // If no token is available, Allow returns false.
  43. // If no token is available, Reserve returns a reservation for a future token
  44. // and the amount of time the caller must wait before using it.
  45. // If no token is available, Wait blocks until one can be obtained
  46. // or its associated context.Context is canceled.
  47. //
  48. // The methods AllowN, ReserveN, and WaitN consume n tokens.
  49. type Limiter struct {
  50. limit Limit
  51. burst int
  52. mu sync.Mutex
  53. tokens float64
  54. // last is the last time the limiter’s tokens field was updated
  55. last time.Time
  56. // lastEvent is the latest time of a rate-limited event (past or future)
  57. lastEvent time.Time
  58. }
  59. // Limit returns the maximum overall event rate.
  60. func (lim *Limiter) Limit () Limit {
  61. lim.mu.Lock()
  62. defer lim.mu.Unlock()
  63. return lim.limit
  64. }
  65. // Burst returns the maximum burst size. Burst is the maximum number of tokens
  66. // that can be consumed in a single call to Allow, Reserve, or Wait, so higher
  67. // Burst values allow more events to happen at once.
  68. // A zero Burst allows no events, unless limit == Inf.
  69. func (lim *Limiter) Burst () int {
  70. return lim.burst
  71. }
  72. // NewLimiter returns a new Limiter that allows events up to rate r and permits
  73. // bursts of at most b tokens.
  74. func NewLimiter(r Limit, b int) *Limiter {
  75. return &Limiter{
  76. limit: r,
  77. burst: b,
  78. }
  79. }
  80. // Allow is shorthand for AllowN(time.Now(), 1).
  81. func (lim *Limiter) Allow () bool {
  82. return lim.AllowN(time.Now() , 1)
  83. }
  84. // AllowN reports whether n events may happen at time now.
  85. // Use this method if you intend to drop / skip events that exceed the rate limit.
  86. // Otherwise use Reserve or Wait.
  87. func (lim *Limiter) AllowN (now time.Time, n int) bool {
  88. return lim.reserveN(now, n, 0).ok
  89. }
  90. // A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
  91. // A Reservation may be canceled, which may enable the Limiter to permit additional events.
  92. type Reservation struct {
  93. ok bool
  94. lim *Limiter
  95. tokens int
  96. timeToAct time.Time
  97. // This is the Limit at reservation time, it can change later.
  98. limit Limit
  99. }
  100. // OK returns whether the limiter can provide the requested number of tokens
  101. // within the maximum wait time. If OK is false, Delay returns InfDuration, and
  102. // Cancel does nothing.
  103. func (r *Reservation) OK () bool {
  104. return r.ok
  105. }
  106. // Delay is shorthand for DelayFrom(time.Now()).
  107. func (r *Reservation) Delay () time.Duration {
  108. return r.DelayFrom(time.Now() )
  109. }
  110. // InfDuration is the duration returned by Delay when a Reservation is not OK.
  111. const InfDuration = time.Duration(1<<63 – 1)
  112. // DelayFrom returns the duration for which the reservation holder must wait
  113. // before taking the reserved action. Zero duration means act immediately.
  114. // InfDuration means the limiter cannot grant the tokens requested in this
  115. // Reservation within the maximum wait time.
  116. func (r *Reservation) DelayFrom (now time.Time) time.Duration {
  117. if ! r.ok {
  118. return InfDuration
  119. }
  120. delay := r.timeToAct.Sub(now)
  121. if delay < 0 {
  122. return 0
  123. }
  124. return delay
  125. }
  126. // Cancel is shorthand for CancelAt(time.Now()).
  127. func (r *Reservation) Cancel () {
  128. r.CancelAt(time.Now())
  129. return
  130. }
  131. // CancelAt indicates that the reservation holder will not perform the reserved action
  132. // and reverses the effects of this Reservation on the rate limit as much as possible,
  133. // considering that other reservations may have already been made.
  134. func (r *Reservation) CancelAt (now time.Time) {
  135. if ! r.ok {
  136. return
  137. }
  138. r.lim.mu.Lock()
  139. defer r.lim.mu.Unlock()
  140. if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now ) {
  141. return
  142. }
  143. // calculate tokens to restore
  144. // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
  145. // after r was obtained. These tokens should not be restored.
  146. restoreTokens := float64(r.tokens) – r.limit.tokensFromDuration (r.lim.lastEvent.Sub(r.timeToAct))
  147. if restoreTokens <= 0 {
  148. return
  149. }
  150. // advance time to now
  151. now, _, tokens := r.lim.advance(now)
  152. // calculate new number of tokens
  153. tokens += restoreTokens
  154. if burst := float64(r.lim.burst); tokens > burst {
  155. tokens = burst
  156. }
  157. // update state
  158. r.lim.last = now
  159. r.lim.tokens = tokens
  160. if r.timeToAct == r.lim.lastEvent {
  161. prevEvent := r.timeToAct.Add(r.limit.durationFromTokens( float64(-r.tokens)))
  162. if ! prevEvent.Before(now) {
  163. r.lim.lastEvent = prevEvent
  164. }
  165. }
  166. return
  167. }
  168. // Reserve is shorthand for ReserveN(time.Now(), 1).
  169. func (lim *Limiter) Reserve () *Reservation {
  170. return lim.ReserveN(time.Now() , 1)
  171. }
  172. // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
  173. // The Limiter takes this Reservation into account when allowing future events.
  174. // ReserveN returns false if n exceeds the Limiter’s burst size.
  175. // Usage example:
  176. // r, ok := lim.ReserveN(time.Now(), 1)
  177. // if ! ok {
  178. // // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
  179. // }
  180. // time.Sleep(r.Delay())
  181. // Act()
  182. // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
  183. // If you need to respect a deadline or cancel the delay, use Wait instead.
  184. // To drop or skip events exceeding rate limit, use Allow instead.
  185. func (lim *Limiter) ReserveN (now time.Time, n int) *Reservation {
  186. r := lim.reserveN(now, n, InfDuration)
  187. return &r
  188. }
  189. // Wait is shorthand for WaitN(ctx, 1).
  190. func (lim *Limiter) Wait (ctx context.Context) (err error) {
  191. return lim.WaitN(ctx, 1)
  192. }
  193. // WaitN blocks until lim permits n events to happen.
  194. // It returns an error if n exceeds the Limiter’s burst size, the Context is
  195. // canceled, or the expected wait time exceeds the Context’s Deadline.
  196. func (lim *Limiter) WaitN (ctx context.Context, n int) (err error) {
  197. if n > lim.burst {
  198. return fmt.Errorf(“rate: Wait(n=%d) exceeds limiter’s burst %d”, n, lim.burst )
  199. }
  200. // Check if ctx is already cancelled
  201. select {
  202. case <-ctx.Done():
  203. return ctx.Err()
  204. default:
  205. }
  206. // Determine wait limit
  207. now := time.Now()
  208. waitLimit := InfDuration
  209. if deadline, ok := ctx.Deadline(); ok {
  210. waitLimit = deadline.Sub(now)
  211. }
  212. // Reserve
  213. r := lim.reserveN(now, n, waitLimit)
  214. if ! r.ok {
  215. return fmt.Errorf(“rate: Wait(n=%d) would exceed context deadline”, n )
  216. }
  217. // Wait
  218. t := time.NewTimer(r.DelayFrom(now ))
  219. defer t.Stop()
  220. select {
  221. case <-t.C:
  222. // We can proceed.
  223. return nil
  224. case <-ctx.Done():
  225. // Context was canceled before we could proceed. Cancel the
  226. // reservation, which may permit other events to proceed sooner.
  227. r.Cancel()
  228. return ctx.Err()
  229. }
  230. }
  231. // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
  232. func (lim *Limiter) SetLimit (newLimit Limit) {
  233. lim.SetLimitAt(time.Now() , newLimit)
  234. }
  235. // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
  236. // or underutilized by those which reserved (using Reserve or Wait) but did not yet act
  237. // before SetLimitAt was called.
  238. func (lim *Limiter) SetLimitAt (now time.Time, newLimit Limit) {
  239. lim.mu.Lock()
  240. defer lim.mu.Unlock()
  241. now, _, tokens := lim.advance(now)
  242. lim.last = now
  243. lim.tokens = tokens
  244. lim.limit = newLimit
  245. }
  246. // reserveN is a helper method for AllowN, ReserveN, and WaitN.
  247. // maxFutureReserve specifies the maximum reservation wait duration allowed.
  248. // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
  249. func (lim *Limiter) reserveN (now time.Time, n int, maxFutureReserve time.Duration) Reservation {
  250. lim.mu.Lock()
  251. defer lim.mu.Unlock()
  252. if lim.limit == Inf {
  253. return Reservation{
  254. ok: true,
  255. lim: lim,
  256. tokens: n,
  257. timeToAct: now,
  258. }
  259. }
  260. now, last, tokens := lim.advance(now)
  261. // Calculate the remaining number of tokens resulting from the request.
  262. tokens -= float64(n)
  263. // Calculate the wait duration
  264. var waitDuration time.Duration
  265. if tokens < 0 {
  266. waitDuration = lim.limit.durationFromTokens(-tokens)
  267. }
  268. // Decide result
  269. ok := n <= lim.burst && waitDuration <= maxFutureReserve
  270. // Prepare reservation
  271. r := Reservation{
  272. ok: ok,
  273. lim: lim,
  274. limit: lim.limit,
  275. }
  276. if ok {
  277. r.tokens = n
  278. r.timeToAct = now.Add(waitDuration)
  279. }
  280. // Update state
  281. if ok {
  282. lim.last = now
  283. lim.tokens = tokens
  284. lim.lastEvent = r.timeToAct
  285. } else {
  286. lim.last = last
  287. }
  288. return r
  289. }
  290. // advance calculates and returns an updated state for lim resulting from the passage of time.
  291. // lim is not changed.
  292. func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64 ) {
  293. last := lim.last
  294. if now.Before(last) {
  295. last = now
  296. }
  297. // Avoid making delta overflow below when last is very old.
  298. maxElapsed := lim.limit.durationFromTokens(float64( lim.burst) – lim.tokens)
  299. elapsed := now.Sub(last)
  300. if elapsed > maxElapsed {
  301. elapsed = maxElapsed
  302. }
  303. // Calculate the new number of tokens, due to time that passed.
  304. delta := lim.limit.tokensFromDuration(elapsed)
  305. tokens := lim.tokens + delta
  306. if burst := float64(lim.burst); tokens > burst {
  307. tokens = burst
  308. }
  309. return now, last, tokens
  310. }
  311. // durationFromTokens is a unit conversion function from the number of tokens to the duration
  312. // of time it takes to accumulate them at a rate of limit tokens per second.
  313. func (limit Limit) durationFromTokens(tokens float64) time.Duration {
  314. seconds := tokens / float64(limit)
  315. return time.Nanosecond * time.Duration(1e9*seconds)
  316. }
  317. // tokensFromDuration is a unit conversion function from a time duration to the number of tokens
  318. // which could be accumulated during that duration at a rate of limit tokens per second.
  319. func (limit Limit) tokensFromDuration(d time.Duration) float64 {
  320. return d.Seconds() * float64 (limit)
  321. }



Algorithm description: If the average sending rate is set to R, one token is added to the bucket every 1/r second (r tokens are added to the bucket every second). A bucket can store a maximum of B tokens. If the bucket is full when the token arrives, the token is discarded;

Implement traffic limiting of user granularity

While using a single global rate limiter can be useful in some cases, it is also common to enforce rate limiters for each user based on identifiers such as IP addresses or API keys. We will use the IP address as the identifier. The simple implementation code is as follows:

  1. package main
  2. import (
  3. “net/http”
  4. “sync”
  5. “time”
  6. “golang.org/x/time/rate”
  7. )
  8. // Create a custom visitor struct which holds the rate limiter for each
  9. // visitor and the last time that the visitor was seen.
  10. type visitor struct {
  11. limiter *rate.Limiter
  12. lastSeen time.Time
  13. }
  14. // Change the the map to hold values of the type visitor.
  15. var visitors = make(map[string] *visitor)
  16. var mtx sync.Mutex
  17. // Run a background goroutine to remove old entries from the visitors map.
  18. func init() {
  19. go cleanupVisitors()
  20. }
  21. func addVisitor(ip string) *rate.Limiter {
  22. limiter := rate.NewLimiter(2, 5)
  23. mtx.Lock()
  24. // Include the current time when creating a new visitor.
  25. visitors[ip] = &visitor{limiter, time. Now()}
  26. mtx.Unlock()
  27. return limiter
  28. }
  29. func getVisitor(ip string) *rate.Limiter {
  30. mtx.Lock()
  31. v, exists := visitors[ip]
  32. if ! exists {
  33. mtx.Unlock()
  34. return addVisitor(ip)
  35. }
  36. // Update the last seen time for the visitor.
  37. v.lastSeen = time.Now()
  38. mtx.Unlock()
  39. return v.limiter
  40. }
  41. // Every minute check the map for visitors that haven’t been seen for
  42. // more than 3 minutes and delete the entries.
  43. func cleanupVisitors() {
  44. for {
  45. time.Sleep(time.Minute)
  46. mtx.Lock()
  47. for ip, v := range visitors {
  48. if time.Now().Sub( v.lastSeen) > 3*time.Minute {
  49. delete(visitors, ip)
  50. }
  51. }
  52. mtx.Unlock()
  53. }
  54. }
  55. func limit(next http.Handler) http.Handler {
  56. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request ) {
  57. limiter := getVisitor(r.RemoteAddr)
  58. if limiter.Allow() == false {
  59. http.Error(w, http.StatusText(429 ), http.StatusTooManyRequests)
  60. return
  61. }
  62. next.ServeHTTP(w, r)
  63. })
  64. }



Of course, this is a simple implementation, but there are a lot of things to consider if we want to implement limiting in apI-Gateway for microservices. I suggest you check out github.com/didip/tollb… The source code.

To play to admire


Your support will encourage us to continue to create!

WeChat pay
Alipay

Scan the QR code with wechat to tip

Scan the QR code with Alipay

WeChat
Sina Weibo
Qzone
Evernote
Facebook
Twitter
Email
Telegram
share