The idea of the funnel algorithm is to store all requests in a bucket first. If the bucket capacity is not full at the moment, the current request is accessible. If it is full, service is denied. At the same time, the bucket takes requests out of the bucket at a constant rate for processing

The request can be temporarily stored in a queue, and if the queue is full, the request is rejected. There is also a periodic scheduled task to consume the data in the queue

As can be seen from the implementation, no matter how much request or instantaneous flow is large, the request is processed at a fixed rate, so the function of token bucket oil flow shaping

Compared with the counter method, token bucket can effectively avoid the problem of jitter, but when the instantaneous request volume is large, subsequent requests are likely to time out due to not being processed in time

import ( "context" "errors" "github.com/google/uuid" "sync" "time" ) type Request interface{} type Command struct { id string request Request } type Micros int64 type Handler func(request Request) (error, interface{}) type Flow interface { Do(ctx context.Context, request Request, timeout time.Duration) (error, interface{}) } type CommandResult struct { err error result interface{} } func NewCommandResult(err error, resp interface{}) *CommandResult { return &CommandResult{ err: err, result: resp, } } type TokenBucket struct { commandInterval Micros commands chan *Command maxBucket int ticker *time.Ticker l sync.Mutex handler Handler dones map[string]chan *CommandResult } func NewCommand(request Request) *Command { return &Command{ id: GetUUID(), request: request, } } func GetUUID() string { uid, err := uuid.NewUUID() if err ! = nil { panic(errors.New("uuid get fail")) } return uid.String() } func (token *TokenBucket) register(id string) chan *CommandResult { token.l.Lock() defer token.l.Unlock() done := token.dones[id] if done == nil { done = make(chan *CommandResult, 1) token.dones[id] = done } return done } func (token *TokenBucket) isRegister(id string) bool { token.l.Lock() defer token.l.Unlock() _, ok := token.dones[id] return ok } func (token *TokenBucket) trigger(id string, v *CommandResult) { token.l.Lock() ch := token.dones[id] delete(token.dones, id) token.l.Unlock() if ch ! = nil { ch <- v close(ch) } } func (token *TokenBucket) Do(ctx context.Context, request Request, timeout time.Duration) (error, interface{}) { command := NewCommand(request) done := token.register(command.id) select { case token.commands <- command: default: Println (" time:" + time.now ().string () + "," + request.(string)) return errors.New(" trigger stream limit "), nil} CCTX, cancel := context.withTimeout (CTX, timeout) defer cancel() select { case resp := <-done: return resp.err, resp.result case <-cctx.Done(): Println (" request timeout, time:" + time.now ().string () + "," + Request.(string)) token.trigger(command. Id, nil) return errors.New(" Request timeout,time:" + time.now ().string ()), nil } } func (token *TokenBucket) start() { for range token.ticker.C { select { case command := <-token.commands: if token.isRegister(command.id) { err, resp := token.handler(command.request) token.trigger(command.id, NewCommandResult(err, resp)) } default: Func NewTokenBucket(maxBuctet int /* The maximum number of requests in the bucket */, commandInterval Micros /* The interval at which requests are processed, Handler Handler/* Request processing method */) Flow {tokenBucket := &tokenbucket {commandInterval: commandInterval, commands: make(chan *Command, maxBuctet), maxBucket: maxBuctet, handler: handler, dones: make(map[string]chan *CommandResult), ticker: time.NewTicker(time.Microsecond * time.Duration(commandInterval)), } go tokenBucket.start() return tokenBucket }Copy the code

Specific use method:

token := NewTokenBucket(5, 100000, func(request Request) (e error, i interface{}) {
   println("handler:" + request.(string) + "," + time.Now().String())
   return nil, request
})
token.Do(context.Background(), "test"+strconv.Itoa(t), time.Millisecond*500)
Copy the code