Introduction to the

I wrote a previous article about ants, the goroutine pool implementation. I found another implementation of Tunny when I was looking online. It’s a good time to do some research. It’s good to compare the two libraries. So let’s get started.

Quick start

The code in this article uses Go Modules.

Create directory and initialize:

$ mkdir tunny && cd tunny
$ go mod init github.com/darjun/go-daily-lib/tunny
Copy the code

Get tunny library from GitHub using Go Get:

$ go get -u github.com/Jeffail/tunny
Copy the code

To make a convenient comparison with ants, let’s re-implement the ants example with Tunny:

const (
  DataSize    = 10000
  DataPerTask = 100
)

func main(a) {
  numCPUs := runtime.NumCPU()
  p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
    var sum int
    for _, n := range payload.([]int) {
      sum += n
    }
    return sum
  })
  defer p.Close()
  // ...
}
Copy the code

Using tunny.newfunc () is also very simple. First create a Pool, using tunny.newfunc ().

The first parameter is the pool size, that is, how many workers (also known as Goroutine) are working at the same time, which is set to the number of logical cpus. For CPU-intensive tasks, this value is too large and meaningless, but may lead to frequent goroutine switching and lower performance.

The second argument is passed a func(interface{})interface{} argument as a task handler. Subsequent incoming data is processed by calling this function.

The pool needs to be closed when it is finished; here we use defer p.close () to close before the program exits.

Then, generate test data, again 10000 random numbers, divided into 100 groups:

nums := make([]int, DataSize)
for i := range nums {
  nums[i] = rand.Intn(1000)}Copy the code

Process each group of data:

var wg sync.WaitGroup
wg.Add(DataSize / DataPerTask)
partialSums := make([]int, DataSize/DataPerTask)
for i := 0; i < DataSize/DataPerTask; i++ {
  go func(i int) {
    partialSums[i] = p.Process(nums[i*DataPerTask : (i+1)*DataPerTask]).(int)
    wg.Done()
  }(i)
}

wg.Wait()
Copy the code

Call the p.troess () method, passing in the task data, and an idle Goroutine is selected in the pool to process the data. Since we set up the handler function above, Goroutine calls that function directly, passing the slice as an argument.

Tunny differs from ants in that tunny’s task processing is synchronous, that is, after calling p.troess (), the current goroutine is suspended and will not be woken up until the task is completed. Because it is synchronous, the p.plomess () method returns the result of the processing directly. This is why the above program launches multiple Goroutines when distributing tasks. If each task does not start a Goroutine, the p.proess () method will wait until the task completes, and subsequent tasks will not execute until the previous task has completed. This takes away the advantage of concurrency.

Notice a small detail here. I passed the for loop variable as an argument to the Goroutine function. If this is not done, all goroutines share the outer I, and the for loop will most likely end when the goroutine starts running, at which point I = DataSize/DataPerTask, nums[I *DataPerTask] : (I +1)*DataPerTask] will cross the boundary and trigger panic.

Final statistical data and verification results:

var sum int
for _, s := range partialSums {
  sum += s
}

var expect int
for _, num := range nums {
  expect += num
}
fmt.Printf("finish all tasks, result is %d expect:%d\n", sum, expect)
Copy the code

Run:

$ go run main.go
finish all tasks, result is 5010172 expect:5010172
Copy the code

timeout

By default, p.rocess () blocks until the task completes, even if no idle worker is currently available. We can also use the Process() method with a timeout: ProcessTimed(). A timeout interval is passed. If there is no idle worker or the task has not been completed after this time, it will terminate and return an error.

There are two cases of timeout:

  • Idle workers cannot wait: all workers are always in busy state, and the tasks being processed are time-consuming and cannot be completed in a short time.
  • The task itself is time-consuming.

Let’s write a function that computes Fibonacci using recursion, an inefficient implementation:

func fib(n int) int {
  if n <= 1 {
    return 1
  }

  return fib(n- 1) + fib(n2 -)}Copy the code

Let’s first look at the time-consuming task, create a Pool object. To make it more obvious, we add the time.sleep () statement to the handler:

p := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} {
  n := payload.(int)
  result := fib(n)
  time.Sleep(5 * time.Second)
  return result
})
defer p.Close()
Copy the code

Generate the number of tasks equal to the pool capacity, call the p.pomesstimed () method and set timeout to 1s:

var wg sync.WaitGroup
wg.Add(numCPUs)
for i := 0; i < numCPUs; i++ {
  go func(i int) {
    n := rand.Intn(30)
    result, err := p.ProcessTimed(n, time.Second)
    nowStr := time.Now().Format("The 2006-01-02 15:04:05")
    iferr ! =nil {
      fmt.Printf("[%s]task(%d) failed:%v\n", nowStr, i, err)
    } else {
      fmt.Printf("[%s]fib(%d) = %d\n", nowStr, n, result)
    }
    wg.Done()
  }(i)
}

wg.Wait()
Copy the code

Because of sleep 5s in the handler, the task timed out during execution. Run:

$ go run main.go 
[2021-06-10 16:36:26]task(7) failed:job request timed out
[2021-06-10 16:36:26]task(4) failed:job request timed out
[2021-06-10 16:36:26]task(1) failed:job request timed out
[2021-06-10 16:36:26]task(6) failed:job request timed out
[2021-06-10 16:36:26]task(5) failed:job request timed out
[2021-06-10 16:36:26]task(0) failed:job request timed out
[2021-06-10 16:36:26]task(3) failed:job request timed out
[2021-06-10 16:36:26]task(2) failed:job request timed out
Copy the code

Both times out in the same second.

We doubled the number of tasks, and changed the sleep in the processing function to 990ms to ensure that the previous batch of tasks could be successfully completed, and the subsequent tasks would be returned due to timeout due to the inability to wait for the idle worker or the execution time being too long. Run:

$ go run main.go
[2021-06-10 16:42:46]fib(11) = 144
[2021-06-10 16:42:46]fib(25) = 121393
[2021-06-10 16:42:46]fib(27) = 317811
[2021-06-10 16:42:46]fib(1) = 1
[2021-06-10 16:42:46]fib(18) = 4181
[2021-06-10 16:42:46]fib(29) = 832040
[2021-06-10 16:42:46]fib(17) = 2584
[2021-06-10 16:42:46]fib(20) = 10946
[2021-06-10 16:42:46]task(5) failed:job request timed out
[2021-06-10 16:42:46]task(14) failed:job request timed out
[2021-06-10 16:42:46]task(8) failed:job request timed out
[2021-06-10 16:42:46]task(7) failed:job request timed out
[2021-06-10 16:42:46]task(13) failed:job request timed out
[2021-06-10 16:42:46]task(12) failed:job request timed out
[2021-06-10 16:42:46]task(11) failed:job request timed out
[2021-06-10 16:42:46]task(6) failed:job request timed out
Copy the code

context

Context is a tool for coordinating goroutine. Tunny supports methods with context.context parameters: ProcessCtx(). After the current context state changes to Done, the task is stopped. Context will be switched to the Done state due to timeout or cancellation. Take the example above:

go func(i int) {
  n := rand.Intn(30)
  ctx, cancel := context.WithCancel(context.Background())
  if i%2= =0 {
    go func(a) {
      time.Sleep(500 * time.Millisecond)
      cancel()
    }()
  }

  result, err := p.ProcessCtx(ctx, n)
  iferr ! =nil {
     fmt.Printf("task(%d) failed:%v\n", i, err)
  } else {
     fmt.Printf("fib(%d) = %d\n", n, result)
  }
  wg.Done()
}(i)
Copy the code

All else being the same, we call the p.rocessctx () method to perform the task. The argument is a cancelable Context. For even-numbered tasks, we launch a goroutine to cancel() the Context after 500ms. The code results are as follows:

$ go run main.go
task(4) failed:context canceled
task(6) failed:context canceled
task(0) failed:context canceled
task(2) failed:context canceled
fib(27) = 317811
fib(25) = 121393
fib(1) = 1
fib(18) = 4181
Copy the code

We see that even numbered tasks are cancelled.

The source code

Tunny has even less source code, less than 500 lines excluding test code and comments. So let’s take a look. The Pool structure is as follows:

// src/github.com/Jeffail/tunny.go
type Pool struct {
  queuedJobs int64

  ctor    func(a) Worker
  workers []*workerWrapper
  reqChan chan workRequest

  workerMut sync.Mutex
}
Copy the code

The Pool structure has a cTOR field, which is a function object that returns a value that implements the Worker interface:

type Worker interface {
  Process(interface{}) interface{}
  BlockUntilReady()
  Interrupt()
  Terminate()
}
Copy the code

Different methods of this interface are called at different stages of the task execution. The most important is the Process(interface{}) interface{} method. This is the function that performs the task. Tunny provides a New() method to create Pool objects. This method requires us to construct cTOR function objects. Tunny provides two other default implementations of closureWorker and callbackWorker:

type closureWorker struct {
  processor func(interface{}) interface{}}func (w *closureWorker) Process(payload interface{}) interface{} {
  return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady(a) {}
func (w *closureWorker) Interrupt(a)       {}
func (w *closureWorker) Terminate(a)       {}

type callbackWorker struct{}

func (w *callbackWorker) Process(payload interface{}) interface{} {
  f, ok := payload.(func(a))
  if! ok {return ErrJobNotFunc
  }
  f()
  return nil
}

func (w *callbackWorker) BlockUntilReady(a) {}
func (w *callbackWorker) Interrupt(a)       {}
func (w *callbackWorker) Terminate(a)       {}
Copy the code

The tunny.newfunc () method uses closureWorker:

func NewFunc(n int, f func(interface{}) interface{}) *Pool {
  return New(n, func(a) Worker {
    return &closureWorker{
      processor: f,
    }
  })
}
Copy the code

The closureWorker created takes the argument f directly as a task handler function.

The tunny.newCallback () method uses callbackWorker:

func NewCallback(n int) *Pool {
  return New(n, func(a) Worker {
    return &callbackWorker{}
  })
}
Copy the code

There are no handlers in the callbackWorker structure. It can only be sent a function object with no parameters and no return value as a task, and its Process() method executes this function.

After the Pool object is created, the SetSize() method is called to set the number of workers. A corresponding number of goroutines are started in this method:

func (p *Pool) SetSize(n int) {
  p.workerMut.Lock()
  defer p.workerMut.Unlock()

  lWorkers := len(p.workers)
  if lWorkers == n {
    return
  }

  for i := lWorkers; i < n; i++ {
    p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
  }

  // Stop too many workers
  for i := n; i < lWorkers; i++ {
    p.workers[i].stop()
  }

  // Wait for the worker to stop
  for i := n; i < lWorkers; i++ {
    p.workers[i].join()
    // -----------------
  }
  p.workers = p.workers[:n]
}
Copy the code

SetSize() is also called for scaling and scaling. For expansion, it creates a corresponding number of workers. For scaling, it stops redundant workers. Unlike ANTS, tunny’s capacity expansion and reduction take effect immediately.

I think there’s something wrong with the code where I’ve marked —————–. For scaling, since the underlying array does not change, after workers slices are scaled down, the following elements in the array are actually inaccessible, but the array still holds its reference, which is a kind of memory leak. So it’s better to be safe to add p.vorkers [I] = nil, right?

The worker created here is actually a workerWrapper structure wrapped in a layer:

// src/github.com/Jeffail/worker.go
type workerWrapper struct {
  worker        Worker
  interruptChan chan struct{}
  reqChan chan<- workRequest
  closeChan chan struct{}
  closedChan chan struct{}}func newWorkerWrapper(
  reqChan chan<- workRequest,
  worker Worker,
) *workerWrapper {
  w := workerWrapper{
    worker:        worker,
    interruptChan: make(chan struct{}),
    reqChan:       reqChan,
    closeChan:     make(chan struct{}),
    closedChan:    make(chan struct{})},go w.run()
  return &w
}
Copy the code

As soon as the workerWrapper structure is created, the run() method is called to launch a goroutine:

func (w *workerWrapper) run(a) {
  jobChan, retChan := make(chan interface{}), make(chan interface{})
  defer func(a) {
    w.worker.Terminate()
    close(retChan)
    close(w.closedChan)
  }()

  for {
    w.worker.BlockUntilReady()
    select {
    case w.reqChan <- workRequest{
      jobChan:       jobChan,
      retChan:       retChan,
      interruptFunc: w.interrupt,
    }:
      select {
      case payload := <-jobChan:
        result := w.worker.Process(payload)
        select {
        case retChan <- result:
        case <-w.interruptChan:
          w.interruptChan = make(chan struct{})}case _, _ = <-w.interruptChan:
        w.interruptChan = make(chan struct{})}case <-w.closeChan:
      return}}}Copy the code

Each worker Goroutine tries to send a workRequest structure data to the W.reqchan channel. After successful sending, it obtains the task data from jobChan and then calls the worker.process () method to execute the task. Finally, the results are sent to the retChan channel. There are actually several interactions here. We need to combine the Process() method to make it clearer:

func (p *Pool) Process(payload interface{}) interface{} {
  request, open := <-p.reqChan
  request.jobChan <- payload
  payload, open = <-request.retChan
  return payload
}
Copy the code

Get rid of the irrelevant code, and that’s what you end up with. When we call the Process() method of the pool object, we try to receive data from the channel reqChan, then send the task data to the jobChan channel, and finally receive the results from the retChan channel. Combined with the run process above, the Pool actually interacts with workerWrapper three times during a normal task.

Looking at the flow from Pool creation to workerWrapper creation, we can see that reqChan in the Pool structure is actually the same channel as reqChan in the workerWrapper structure. The workerWrapper startup blocks sending data to the reqChan channel until the Pool Process*() method is called to fetch data from the channel reqChan. The Process() method gets the workRequest and sends the task data to its jobChan channel. The workerWrapper.run() method, after successfully sending data to reqChan, waits for data to be received from the jobChan channel, which then receives data from the Process() method. The w.warker.process () method starts executing and then sends the result data to the retChan channel, and the Process() method waits to receive the data from the retChan channel after successfully sending the data to jobChan. Upon successful receipt, the Process() method returns and workerWrapper.run() continues to block on the w.reqchan <- statement, waiting for the next task to be processed. Note that jobChan and retChan are both channels created in the workerWrapper.run() method.

So how do timeouts work? Look at the implementation of the method ProcessTimed() :

func (p *Pool) ProcessTimed(
  payload interface{},
  timeout time.Duration,
) (interface{}, error) {
  tout := time.NewTimer(timeout)
  var request workRequest
  select {
  case request, open = <-p.reqChan:
  case <-tout.C:
    return nil, ErrJobTimedOut
  }

  select {
  case request.jobChan <- payload:
  case <-tout.C:
    request.interruptFunc()
    return nil, ErrJobTimedOut
  }

  select {
  case payload, open = <-request.retChan:
  case <-tout.C:
    request.interruptFunc()
    return nil, ErrJobTimedOut
  }

  tout.Stop()
  return payload, nil
}
Copy the code

Likewise, remove irrelevant code. First, create a timer with the timeout specified by the passed parameter. There are three select statements:

  • Waiting to take thep.reqChanGet data, that is, wait for worker to be idle;
  • Waiting for data to be sentjobChan, that is, wait for the worker fromjobChanRetrieve task data;
  • Waiting to take theretChanFetch data, that is, wait for the worker to send the result toretChan.

In the first case, if the timeout occurs, it indicates that the worker is in the busy state and directly returns the task timeout. In the latter two cases, the task has actually started, but is not completed within the specified time. In both cases, the execution of the task must be terminated. We see the above call workerRequest. InterruptFunc () method, namely workerWrapper. Interrupt () method:

func (w *workerWrapper) interrupt(a) {
  close(w.interruptChan)
  w.worker.Interrupt()
}
Copy the code

This method simply closes the interrupteChan channel and calls the Interrupt() method on the worker object, which is null by default.

After interruptChan is closed, all goroutine operations waiting to receive data from jobChan and to send data to retChan are canceled:

select {
case payload := <-jobChan:
  result := w.worker.Process(payload)
  select {
  case retChan <- result:
  case <-w.interruptChan:
    w.interruptChan = make(chan struct{})}case _, _ = <-w.interruptChan:
  w.interruptChan = make(chan struct{})}Copy the code

The ProcessCtx() implementation is similar.

Finally, calling workerwrapper.stop () closes the closeChan channel, which causes the for loop in the workerwrapper.run () method to jump out, and we defer close(retChan) and close(closedChan) :

defer func(a) {
  w.worker.Terminate()
  close(retChan)
  close(w.closedChan)
}()
Copy the code

The retChan channel needs to be turned off here to prevent the Process*() method from waiting for retChan data.

The workerWrapper.join() method returns when the closedChan channel is closed.

func (w *workerWrapper) join(a) {
  <-w.closedChan
}
Copy the code

Call timing of Worker’s methods:

  • Process(): When performing tasks;
  • Interrupt(): The task will be canceled by context because of timeout;
  • BlockUntilReady(): You may need to prepare some resources before each new task.
  • Terminate():workerWrapper.run()In the defer function in, after stopping the worker.

These moments are clearly visible in the code.

Based on the source code, I drew a flow chart:

The interrupted flow is omitted from the figure.

tunny vs ants

Tunny’s design idea is quite different from ants:

Tunny only supports synchronous execution of the task. Although the task is executed in another Goroutine, the Goroutine submitting the task must wait for the result to return or time out. Can’t do anything else. Because of this, Tunny’s design is slightly more complex, and multiple channels are designed to communicate with the goroutine performing the task in order to support timeouts and cancellations. The execution of a task involves multiple communications, and performance is compromised. Synchronous programming, on the other hand, is more intuitive.

Ants is a completely asynchronous task execution process with slightly higher performance than Tunny. However, due to its asynchronous nature, there is no task timeout or cancellation mechanism. And if you want to collect results, you have to write extra code yourself.

conclusion

This article introduced tunny, another goroutine pool implementation. It handles tasks in a synchronous manner, makes coding more intuitive, and has stronger control over the execution flow of tasks, such as timeouts and cancellations. Of course the implementation is a bit more complicated. Tunny code does not go 500 lines and is highly recommended to read.

If you find a fun and useful Go library, please Go to GitHub and submit issue😄

reference

  1. Tunny GitHub:github.com/Jeffail/tun…
  2. Ants GitHub:github.com/panjf2000/a…
  3. GitHub: github.com/darjun/go-d…

I

My blog is darjun.github. IO

Welcome to follow my wechat public account [GoUpUp], learn together, progress together ~