Introduction to the

In this article, we will take a look at the source code of ANTS.

Pool

From the previous article, we learned that there are two ways to create ants pools:

  • p, _ := ants.NewPool(cap): Pool objects created this way need to be calledp.Submit(task)Submit the task, which is a function with no parameters and no return value;
  • p, _ := ants.NewPoolWithFunc(cap, func(interface{}))Pool objects created this way need to specify the pool function, and usep.Invoke(arg)Call the pool function.argIt’s called the pool functionfunc(interface{})The parameters.

In ants, the two pools are represented with different structures: ants.Pool and ants.PoolWithFunc. Let’s start with Pool. The PoolWithFunc structure is similar, and after introducing pools, let’s briefly compare them.

The Pool structure is defined in the file pool.go:

// src/github.com/panjf2000/ants/pool.go
type Pool struct {
  capacity int32
  running int32
  workers workerArray
  state int32
  lock sync.Locker
  cond *sync.Cond
  workerCache sync.Pool
  blockingNum int
  options *Options
}
Copy the code

The meanings of the fields are as follows:

  • capacity: Indicates the pool capacityantsMaximum number of Goroutines that can be created. If the value is negative, the capacity is unlimited.
  • running: Number of worker Goroutines that have been created;
  • workers: Stores a group of worker objects,workerArrayJust an interface that represents a worker container, more on that later;
  • state: Records the current state of the pool, whether it is closed (CLOSED);
  • lock: the lock.antsYou implement a spin lock yourself. For synchronous concurrent operations;
  • cond: Condition variable. Handle task wait and wake up;
  • workerCacheUse:sync.PoolManage and create an object poolworkerObject, improve performance;
  • blockingNum: Number of tasks blocking waiting;
  • options: options. This was covered in detail in the last article.

Here is a clear concept: Each task in ants is handled by worker objects, and each worker object creates a goroutine to process the task. GoWorker is used for worker in ants:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func(a)
  recycleTime time.Time
}
Copy the code

More on this later, but for now we just need to know that the Pool.workers field is the container that holds the goWorker object.

Poolcreate

To create a Pool object, call ants.NewPool(size, options). Omitted some code for processing options, resulting in the following code:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ... Option) (*Pool, error) {
  // ...
  p := &Pool{
    capacity: int32(size),
    lock:     internal.NewSpinLock(),
    options:  opts,
  }
  p.workerCache.New = func(a) interface{} {
    return &goWorker{
      pool: p,
      task: make(chan func(a).workerChanCap),}}if p.options.PreAlloc {
    if size == - 1 {
      return nil, ErrInvalidPreAllocSize
    }
    p.workers = newWorkerArray(loopQueueType, size)
  } else {
    p.workers = newWorkerArray(stackType, 0)
  }

  p.cond = sync.NewCond(p.lock)

  go p.purgePeriodically()
  return p, nil
}
Copy the code

The code is not hard to understand:

  • createPoolObject to set the capacity and create a spin lock to initializelockField, set options;
  • Set up theworkerCachethissync.PoolThe object’sNewMethod in the callsync.PoolThe object’sGet()Method, if it has no cached worker object, it calls this method to create one;
  • Create different types of workers according to whether the pre-allocation option is set.
  • usep.lockLock creates a condition variable;
  • Finally, a Goroutine is started to periodically clean up expired workers.

The Pool. Workers field is of type workerArray, which is actually an interface representing a worker container:

type workerArray interface {
  len(a)int
  isEmpty() bool
  insert(worker *goWorker) error
  detach() *goWorker
  retrieveExpiry(duration time.Duration) []*goWorker
  reset()
}
Copy the code

The name of each method makes sense:

  • len() int: Number of workers;
  • isEmpty() bool: Whether the number of workers is 0;
  • insert(worker *goWorker) error: After the goroutine task is executed, put the corresponding worker backworkerArray;
  • detach() *goWorkerFrom:workerArrayRetrieve a worker from;
  • retrieveExpiry(duration time.Duration) []*goWorker: Remove all expired workers;
  • reset(): Resets the container.

WorkerArray has two implementations in ANTS, namely workerStack and loopQueue.

workerStack

Let’s start with the workerStack, which is located in the file worker_stack.go:

// src/github.com/panjf2000/ants/worker_stack.go
type workerStack struct {
  items  []*goWorker
  expiry []*goWorker
  size   int
}

func newWorkerStack(size int) *workerStack {
  return &workerStack{
    items: make([]*goWorker, 0, size),
    size:  size,
  }
}
Copy the code
  • items: freeworker;
  • expiry: out of dateworker.

After goroutine completes the task, the Pool Pool will put the corresponding worker back into the workerStack. Call workerStack.insert() to append the worker directly to the items:

func (wq *workerStack) insert(worker *goWorker) error {
  wq.items = append(wq.items, worker)
  return nil
}
Copy the code

When a new task arrives, workerstack.detach () is called to retrieve an idle worker from the container:

func (wq *workerStack) detach(a) *goWorker {
  l := wq.len(a)if l == 0 {
    return nil
  }

  w := wq.items[l- 1]
  wq.items[l- 1] = nil // avoid memory leaks
  wq.items = wq.items[:l- 1]

  return w
}
Copy the code

Here, the last worker is always returned, and each insert() is also append to the end, which conforms to the last-in-first-out feature of the stack, so it is called the workerStack.

One detail here is that since the underlying structure of a slice is an array, the elements in the array are not freed as long as there is a pointer to the array. After fetching the last element of the slice, set the pointer to the corresponding array element to nil and release the reference actively.

As mentioned above, when creating a new Pool object, a Goroutine is created to periodically check and clean up expired workers. By calling the workerArray. RetrieveExpiry () to obtain a list of overdue worker. The workerStack implementation is as follows:

func (wq *workerStack) retrieveExpiry(duration time.Duration)[] *goWorker {
  n := wq.len(a)if n == 0 {
    return nil
  }

  expiryTime := time.Now().Add(-duration)
  index := wq.binarySearch(0, n- 1, expiryTime)

  wq.expiry = wq.expiry[:0]
  ifindex ! =- 1 {
    wq.expiry = append(wq.expiry, wq.items[:index+1]...). m :=copy(wq.items, wq.items[index+1:)for i := m; i < n; i++ {
      wq.items[i] = nil
    }
    wq.items = wq.items[:m]
  }
  return wq.expiry
}
Copy the code

The implementation uses binary search to find the latest worker that has expired. Since the expiration time is calculated as the idle time after the goroutine executes the task, the workerstack.insert () queue order dictates that they expire from morning to night. So binary lookup can be used:

func (wq *workerStack) binarySearch(l, r int, expiryTime time.Time) int {
  var mid int
  for l <= r {
    mid = (l + r) / 2
    if expiryTime.Before(wq.items[mid].recycleTime) {
      r = mid - 1
    } else {
      l = mid + 1}}return r
}
Copy the code

Binary search is the worker that has recently expired, and the one before the worker that is about to expire. It and the worker before it have all expired.

If index is found, copy all worker items from the beginning to index (inclusive) into the expiry field. Then all unexpired workers after index are copied to the slice head, where the copy function is used. Copy returns the actual number of copies, that is, the number of unexpired workers m. Then set all of the sliced items from m to nil to avoid memory leaks because they are already copied to the header. Finally, cut the items slice and return the expired worker slice.

loopQueue

The loopQueue implementation is based on a circular queue and the structure is defined in the file worker_loop_queue:

type loopQueue struct {
  items  []*goWorker
  expiry []*goWorker
  head   int
  tail   int
  size   int
  isFull bool
}

func newWorkerLoopQueue(size int) *loopQueue {
  return &loopQueue{
    items: make([]*goWorker, size),
    size:  size,
  }
}
Copy the code

Since it is a circular queue, a slice of size is created here. A circular queue has a queue head pointer, head, to the first place where an element is present, and a queue tail pointer to the next place where an element can be stored. So the initial state is as follows:

Add the element at tail, and the tail pointer moves back. When the element is fetched at head, the head pointer moves back. After a period of operation, the queue status is as follows:

The head or tail pointer is at the end of the queue and needs to be wrapped. So this could happen:

When the tail pointer catches up with the head pointer, the queue is full:

When the head pointer catches up with the tail pointer, the queue is empty again:

It is easy to see how loopQueue operates.

An isFull field is added to the loopQueue because head and tail can be either empty or full. After goroutine completes the task, it puts the corresponding worker object back into the loopQueue using insert() :

func (wq *loopQueue) insert(worker *goWorker) error {
  if wq.size == 0 {
    return errQueueIsReleased
  }

  if wq.isFull {
    return errQueueIsFull
  }
  wq.items[wq.tail] = worker
  wq.tail++

  if wq.tail == wq.size {
    wq.tail = 0
  }
  if wq.tail == wq.head {
    wq.isFull = true
  }

  return nil
}
Copy the code

If tail==head is inserted, the queue isFull, and the isFull field is set.

Call loopqueeue.detach () to get an idle worker structure when a new task arrives:

func (wq *loopQueue) detach(a) *goWorker {
  if wq.isEmpty() {
    return nil
  }

  w := wq.items[wq.head]
  wq.items[wq.head] = nil
  wq.head++
  if wq.head == wq.size {
    wq.head = 0
  }
  wq.isFull = false

  return w
}
Copy the code

This method corresponds to the queue dequeueing process. Note that after each dequeueing, the queue must be full and isFull must be reset to false.

Similar to the structure of workerStack, the first worker object expires earlier and the second worker object expires later. The method of obtaining expired worker is similar to that in workerStack, except that binary search is not used. I won’t go into it here.

Take a look atPoolcreate

WorkerArray workerArray workerArray workerArray workerArray workerArray

if p.options.PreAlloc {
  if size == - 1 {
    return nil, ErrInvalidPreAllocSize
  }
  p.workers = newWorkerArray(loopQueueType, size)
} else {
  p.workers = newWorkerArray(stackType, 0)}Copy the code

NewWorkerArray () is defined in the file worker_array.go:

type arrayType int

const (
  stackType arrayType = 1 << iota
  loopQueueType
)

func newWorkerArray(aType arrayType, size int) workerArray {
  switch aType {
  case stackType:
    return newWorkerStack(size)
  case loopQueueType:
    return newWorkerLoopQueue(size)
  default:
    return newWorkerStack(size)
  }
}
Copy the code

That is, if the preallocation option is set, the loopQueue structure is used. Otherwise, use the stack structure.

The worker structure

After introducing Pool creation and structure, let’s take a look at worker structure. In ants, worker is represented by the structure goWorker, which is defined in the file worker.go. Its structure is very simple:

// src/github.com/panjf2000/ants/worker.go
type goWorker struct {
  pool *Pool
  task chan func(a)
  recycleTime time.Time
}
Copy the code

The specific field meaning is obvious:

  • pool: Holds a reference to the Goroutine pool;
  • task: task channel through which the type isfunc ()Is sent as a task togoWorker;
  • recyleTime: This field recordsgoWorkerWhen it is put back into the pool (i.e., when it becomes idle). When it completes its task, it is set when it is put back into the Goroutine pool.

The run() method is called when the goWorker is created, and a new Goroutine processing task is started in the run() method. The run() body flow is very simple:

func (w *goWorker) run(a) {
  go func(a) {
    for f := range w.task {
      if f == nil {
        return
      }
      f()
      ifok := w.pool.revertWorker(w); ! ok {return(1)}}}}Copy the code

This method starts a new Goroutine, continually receives tasks from the task channel, executes the task, and then calls the pool object’s revertWorker() method to put the goWorker object back into the pool so it can be retrieved and processed next time. The revertWorker() method is examined in more detail later.

Note here that the loop for f := range w.ask does not actually terminate until the channel task closes or fetches the nil task. So this Goroutine is always running, which is the key to ants’ high performance. Each goWorker starts a Goroutine only once and then reuses the goroutine. Goroutine performs one task at a time and is put back into the pool.

One more detail is that if the put back operation fails, return is called, which ends the Goroutine and prevents it from leaking.

So this is also a little bit of a detail where f == nil returns when it’s true, and we’ll talk about that later when we talk about pool closing.

Let’s look at the exception handling of the run() method:

defer func(a) {
  w.pool.workerCache.Put(w)
  if p := recover(a); p ! =nil {
    ifph := w.pool.options.PanicHandler; ph ! =nil {
      ph(p)
    } else {
      w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
      var buf [4096]byte
      n := runtime.Stack(buf[:], false)
      w.pool.options.Logger.Printf("worker exits from panic: %s\n".string(buf[:n]))
    }
  }
  w.pool.cond.Signal()
}()
Copy the code

In a nutshell, the recover() function in defer captures the panic that was thrown during the task execution. The task failed and the Goroutine ended. However, the goWorker object can still be reused, so the function defer initially calls w.pool.workerCache.put (w) to Put the goWorker object back into the sync.pool.

The next step is to handle panic, which is called directly if the panic handler is specified in the options. Otherwise, the Logger set in the ants call option logs logs such as the stack, panic messages, etc.

Finally, call w.pool.cond.signal () to inform you that the goWorker is now free. Because the actual number of goworkers we run is one less due to Panic, and there may be other tasks waiting to be processed in the pool.

Submit a task

Next, the entire process can be strung together by submitting tasks. As we learned from the previous article, we can call the Submit() method of the pool object to Submit the task:

func (p *Pool) Submit(task func(a)) error {
  if p.IsClosed() {
    return ErrPoolClosed
  }
  var w *goWorker
  if w = p.retrieveWorker(); w == nil {
    return ErrPoolOverload
  }
  w.task <- task
  return nil
}
Copy the code

First determine if the pool is closed, then call the retrieveWorker() method to get an idle worker, and then send the task task to the worker’s task channel. Here is the implementation of retrieveWorker() :

func (p *Pool) retrieveWorker(a) (w *goWorker) {
  p.lock.Lock()

  w = p.workers.detach()
  ifw ! =nil {
    p.lock.Unlock()
  } else if capacity := p.Cap(); capacity == - 1 || capacity > p.Running() {
    p.lock.Unlock()
    spawnWorker()
  } else {
    if p.options.Nonblocking {
      p.lock.Unlock()
      return
    }
  Reentry:
    ifp.options.MaxBlockingTasks ! =0 && p.blockingNum >= p.options.MaxBlockingTasks {
      p.lock.Unlock()
      return
    }
    p.blockingNum++
    p.cond.Wait()
    p.blockingNum--
    var nw int
    if nw = p.Running(); nw == 0 {
      p.lock.Unlock()
      if! p.IsClosed() { spawnWorker() }return
    }
    if w = p.workers.detach(); w == nil {
      if nw < capacity {
        p.lock.Unlock()
        spawnWorker()
        return
      }
      goto Reentry
    }

    p.lock.Unlock()
  }
  return
}
Copy the code

This is a little bit more complicated, so let’s take a look at it a little bit. First call p.vorkers.detach () to get the goWorker object. P.walkers are loopQueue or workerStack objects, both of which implement the detach() method as described earlier.

If a goWorker object is returned, a free Goroutine is returned.

Otherwise, the pool capacity has not run out (i.e. the capacity is greater than the number of goworkers currently working), call spawnWorker() to create a new goWorker and execute its run() method:

spawnWorker := func(a) {
  w = p.workerCache.Get().(*goWorker)
  w.run()
}
Copy the code

Otherwise, the pool capacity is used up. Returns directly if the non-blocking option is set. Otherwise, if the upper limit of the maximum blocking queue length is set and the number of tasks currently waiting for blocking has reached the upper limit, return directly. Otherwise, block the number of waits +1 and call p.corond.wait () to Wait.

Then, when goworker.run () completes a task, it calls the pool’s revertWorker() method to put goWorker back:

func (p *Pool) revertWorker(worker *goWorker) bool {
  if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
    return false
  }
  worker.recycleTime = time.Now()
  p.lock.Lock()

  if p.IsClosed() {
    p.lock.Unlock()
    return false
  }

  err := p.workers.insert(worker)
  iferr ! =nil {
    p.lock.Unlock()
    return false
  }

  p.cond.Signal()
  p.lock.Unlock()
  return true
}
Copy the code

The recycleTime field of goWorker is set here to determine the expiration. The goWorker is then put back into the pool. The Insert () method of Workers has also been analyzed previously.

P.corond.signal () is then called to wake up the wait in the previous retrieveWorker() method. The retrieveWorker() method continues, blocking wait count -1, where the number of goworkers currently (that is, the number of Goroutines) is determined. If the number is equal to zero, it is likely that the pool has just performed Release() closure, in which case you need to determine whether the pool is closed and return if so. Otherwise, call spawnWorker() to create a new goWorker and execute its run() method.

If the current number of goworkers is not zero, call p.wolkers.detach () to retrieve an idle goWorker and return it. This operation may fail because there may be multiple Goroutines waiting at the same time and only some goroutines can get the goWorker when they wake up. If it fails and its capacity is not used up, a new goWorker is created directly, otherwise the blocking wait logic is re-executed.

There’s a lot of locking and unlocking logic, plus it’s hard to understand when mixed with semaphores. The only thing you need to know is that p.cauld.wait () internally suspends the current goroutine and then unlocks it, calling p.lock.unlock (). This is why the p.lock.lock () Lock works in revertWorker(). Then p.corond.signal () or p.corond.broadcast () wakes up the goroutine that was suspended because of p.corond.wait (), but requires the unlock method to be called in the goroutine where Signal()/Broadcast() is located. The goroutine that called p.ond.Wait() is awakened and the internal Lock is reexecuted (p.lock.lock ()), so the logic after p.ond.Wait() is still locked.

Finally, here is the overall flow chart:

Clean up the overduegoWorker

The NewPool() function starts a goroutine that periodically cleans up expired goworkers:

func (p *Pool) purgePeriodically(a) {
  heartbeat := time.NewTicker(p.options.ExpiryDuration)
  defer heartbeat.Stop()

  for range heartbeat.C {
    if p.IsClosed() {
      break
    }

    p.lock.Lock()
    expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
    p.lock.Unlock()

    for i := range expiredWorkers {
      expiredWorkers[i].task <- nil
      expiredWorkers[i] = nil
    }

    if p.Running() == 0 {
      p.cond.Broadcast()
    }
  }
}
Copy the code

If the pool is closed, exit Goroutine directly. The ExpiryDuration option is used to set the clearance interval. If this option is not set, use the default 1s:

// src/github.com/panjf2000/ants/pool.go
func NewPool(size int, options ... Option) (*Pool, error) {
  if expiry := opts.ExpiryDuration; expiry < 0 {
    return nil, ErrInvalidPoolExpiry
  } else if expiry == 0 {
    opts.ExpiryDuration = DefaultCleanIntervalTime
  }
}

// src/github.com/panjf2000/ants/pool.go
const (
  DefaultCleanIntervalTime = time.Second
)
Copy the code

Then, each clearance cycle, the p.vorkers.retrieveexpiry () method is called to remove the expired goWorker. Since the goroutine started by these Goworkers is still blocking on the channel task, a nil value is sent to the channel, and the goworker.run () method receives a nil value and returns, terminating the goroutine, Goroutine leaks are avoided.

If all the goworkers are cleaned up, there may still be Goroutines blocking on p.corond.wait () in the retrieveWorker() method, so call p.corond.broadcast () to wake up these Goroutines.

Dynamic Capacity Change

The pool capacity can be dynamically modified during run time. Call p.tune (size int) :

func (p *Pool) Tune(size int) {
  if capacity := p.Cap(); capacity == - 1 || size <= 0 || size == capacity || p.options.PreAlloc {
    return
  }
  atomic.StoreInt32(&p.capacity, int32(size))
}
Copy the code

This simply sets the new capacity, does not affect the goWorker currently executing, and the capacity cannot be set again if the pre-allocation option is set.

The next time revertWorker() is executed, the new capacity is used to determine whether it can be put back, and the next time retrieveWorker() is executed, the new capacity is used to determine whether a new goWorker can be created.

Close and restartPool

Once you’re done, you need to shut down the Pool to prevent Goroutine leaks. Call the pool object’s Release() method to close:

func (p *Pool) Release(a) {
  atomic.StoreInt32(&p.state, CLOSED)
  p.lock.Lock()
  p.workers.reset()
  p.lock.Unlock()
  p.cond.Broadcast()
}
Copy the code

Call p.vorkers.reset () to terminate loopQueue or wokerStack goroutine to do some cleanup, and to prevent goroutine blocking on p.cond.wait (), Run p.corond.broadcast () once.

The workerStack is essentially the same as loopQueue reset(), which terminates the Goroutine by sending nil to the Task channel and then resets the fields:

/ / loopQueue version
func (wq *loopQueue) reset(a) {
  if wq.isEmpty() {
    return
  }

Releasing:
  ifw := wq.detach(); w ! =nil {
    w.task <- nil
    goto Releasing
  }
  wq.items = wq.items[:0]
  wq.size = 0
  wq.head = 0
  wq.tail = 0
}

/ / stack version
func (wq *workerStack) reset(a) {
  for i := 0; i < wq.len(a); i++ { wq.items[i].task <-nil
    wq.items[i] = nil
  }
  wq.items = wq.items[:0]}Copy the code

You can also restart the pool by calling Reboot() :

func (p *Pool) Reboot(a) {
  if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
    go p.purgePeriodically()
  }
}
Copy the code

Since P. purgeperiodically () detects a pool shutdown after P. relase () and exits directly, a periodically clean goroutine needs to be restarted.

PoolWithFuncandWorkWithFunc

In the last article, we also introduced another way to create a Pool, NewPoolWithFunc(), specifying a function. This function can be executed by calling p.invoke () to provide arguments when submitting the task later. The Pool and Woker structures created in this way are as follows:

type PoolWithFunc struct {
  workers []*goWorkerWithFunc
  poolFunc func(interface{})
}

type goWorkerWithFunc struct {
  pool *PoolWithFunc
  args chan interface{}
  recycleTime time.Time
}
Copy the code

Much like the Pool and goWorker examples above, PoolWithFunc holds the function object passed in and uses an array to hold the worker. GoWorkerWithFunc uses interface{} as the args channel’s data type, which is easy to understand because there is already a function that just needs to pass in data as an argument:

func (w *goWorkerWithFunc) run(a) {
  go func(a) {
    for args := range w.args {
      if args == nil {
        return
      }
      w.pool.poolFunc(args)
      ifok := w.pool.revertWorker(w); ! ok {return(1)}}}}Copy the code

Receives function parameters from the channel and executes function objects saved in the pool.

Other details

taskBuffer channel

Remember the code that created the sync.Pool object p.vorkercache:

p.workerCache.New = func(a) interface{} {
  return &goWorker{
    pool: p,
    task: make(chan func(a).workerChanCap),}}Copy the code

When there is no goWorker object in sync.pool, call the New() method to create one, and notice that the task channel created here uses workerChanCap as its capacity. This variable is defined in the ants.go file:

var (
  // workerChanCap determines whether the channel of a worker should be a buffered channel
  // to get the best performance. Inspired by fasthttp at
  // https://github.com/valyala/fasthttp/blob/master/workerpool.go#L139
  workerChanCap = func(a) int {
    // Use blocking channel if GOMAXPROCS=1.
    // This switches context from sender to receiver immediately,
    // which results in higher performance (under GO1.5 at least)
    if runtime.GOMAXPROCS(0) = =1 {
      return 0
    }

    // Use non-blocking workerChan if GOMAXPROCS>1,
    // since otherwise the sender might be dragged down if the receiver is CPU-bound.
    return 1
  }()
)
Copy the code

I’ve included the notes for comparison. Ants references the implementation of fasthTTP, a well-known Web framework. When GOMAXPROCS is 1 (i.e., the number of operating system threads is 1), sending a channel task suspends sending a Goroutine, and the execution process is redirecting to receiving a Goroutine, which improves receive processing performance. If the GOMAXPROCS is greater than 1, ants uses a buffered channel to prevent the receiving goroutine from being CPU-intensive and causing the sending goroutine to be blocked. Here is the relevant code in FasthTTp:

// src/github.com/valyala/fasthttp/workerpool.go
var workerChanCap = func(a) int {
  // Use blocking workerChan if GOMAXPROCS=1.
  // This immediately switches Serve to WorkerFunc, which results
  // Higher performance (under go1.5 at least)
  if runtime.GOMAXPROCS(0) = =1 {
    return 0
  }

  // Use non-blocking workerChan if GOMAXPROCS>1,
  // since otherwise the Serve caller (Acceptor) may lag accepting
  // new connections if WorkerFunc is CPU-bound.
  return 1} ()Copy the code

spinlocks

Ants use atomic.Com pareAndSwapUint32 () the atomic operation implements a spin lock. Unlike other types of locks, spin locks do not immediately go into wait after the lock fails, but continue trying. This is a great performance boost for applications that acquire locks quickly, because it avoids thread switching due to locking and unlocking:

type spinLock uint32

func (sl *spinLock) Lock(a) {
  backoff := 1
  for! atomic.CompareAndSwapUint32((*uint32)(sl), 0.1) {
    for i := 0; i < backoff; i++ {
      runtime.Gosched()
    }
    backoff <<= 1}}func (sl *spinLock) Unlock(a) {
  atomic.StoreUint32((*uint32)(sl), 0)}// NewSpinLock instantiates a spin-lock.
func NewSpinLock(a) sync.Locker {
  return new(spinLock)
}
Copy the code

In addition, exponential backoff is used, waiting for a cycle and telling the runtime to switch to another Goroutine via runtime.gosched (). If you still can’t get the lock, wait another 2 cycles. If that doesn’t work, wait 4,8,16… And so on. This prevents the lock from not being acquired for a short period of time, resulting in wasted CPU time.

conclusion

Ants source code is short and concise, without reference to any other third-party libraries. All kinds of details, all kinds of performance optimization points are worth our careful taste. I strongly recommend reading the source code. Reading excellent source code, can greatly improve their own coding literacy.

If you find a fun and useful Go library, please Go to GitHub and submit issue😄

reference

  1. Ants GitHub:github.com/panjf2000/a…
  2. GitHub: github.com/darjun/go-d…

I

My blog is darjun.github. IO

Welcome to follow my wechat public account [GoUpUp], learn together, progress together ~