The opening

I wrote an article earlier with the catchy title: Handling 1 Million Requests per Minute with Go. It was written by an author abroad, and I made a note of it. From is also this title, reading volume is my best one, as expected the article is on the title of…..

Came across another article today (original at the end of article). The principle of the two articles is similar: there is a batch of jobs, which can be processed concurrently by multiple workers through worker-pool.

They still have a lot of differences, and the implementation is quite different.

First of all, IN my last post I included a picture of the entire workflow.

  • eachworkerJust finish the task, don’t care about the result, don’t do anything about the result.
  • As long as the request does not stop, the program will not stop, there is no control mechanism, except for downtime.

The differences between this article are:

First the data will go fromgenerate(Production data)-> Concurrent processing data -> aggregation of processing results. The graph looks something like this,

It can then use context.context to stop the working pool from working.

Finally, through the code, you can see that it is not a worker-pool in the traditional sense, as explained later.

The following diagram illustrates the overall process clearly.

Incidentally, the code implemented in this article is much simpler than the code for Handling 1 Million Requests per Minute with Go.

First, job.

package wpool import ( "context" ) type JobID string type jobType string type jobMetadata map[string]interface{} type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error) type JobDescriptor struct { ID JobID JType jobType Metadata map[string]interface{} } type Result struct { Value interface{} Err error Descriptor JobDescriptor } type Job struct { Descriptor JobDescriptor ExecFn ExecutionFn Args Func (j job) execute(CTX context.context) Result {value, func (j job) execute(CTX context.context) Result {value, err := j.ExecFn(ctx, j.Args) if err ! = nil { return Result{ Err: err, Descriptor: j.Descriptor, } } return Result{ Value: value, Descriptor: j.Descriptor, } }Copy the code

So we can simplify this a little bit. Each job is returned as a Result.

This is the core code.

Package wpool import ("context" "FMT" "sync") // Func worker(CTX context. context, WG * sync.waitgroup, jobs <-chan Job, results chan<- Result) { defer wg.Done() for { select { case job, ok := <-jobs: if ! ok { return } results <- job.execute(ctx) case <-ctx.Done(): fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err()) results <- Result{ Err: ctx.Err(), } return}} type WorkerPool struct {workersCount int // Number of workers jobs chan Job // Channel results for storing jobs chan Func New(wcount int) WorkerPool{return WorkerPool{ workersCount: wcount, jobs: make(chan Job, wcount), results: make(chan Result, wcount), Done: make(chan struct{}), } } func (wp WorkerPool) Run(ctx context.Context) { var wg sync.WaitGroup for i := 0; i < wp.workersCount; i++ { wg.Add(1) go worker(ctx, &wg, wp.jobs, wp.results) } wg.Wait() close(wp.Done) close(wp.results) } func (wp WorkerPool) Results() <-chan Result { return wp.results } func (wp WorkerPool) GenerateFrom(jobsBulk []Job) { for i, _ := range jobsBulk { wp.jobs <- jobsBulk[i] } close(wp.jobs) }Copy the code

The entire WorkerPool structure is simple. Jobs is a buffer channel. Each task is placed in Jobs to be processed by Woker.

Results is also a channel type that stores the Result of each job.

Initialize a worker-pool with New and Run to start running.

func New(wcount int) WorkerPool {
	return WorkerPool{
		workersCount: wcount,
		jobs:         make(chan Job, wcount),
		results:      make(chan Result, wcount),
		Done:         make(chan struct{}),
	}
}
func (wp WorkerPool) Run(ctx context.Context) {
	var wg sync.WaitGroup

	for i := 0; i < wp.workersCount; i++ {
		wg.Add(1)
		go worker(ctx, &wg, wp.jobs, wp.results)
	}

	wg.Wait()
	close(wp.Done)
	close(wp.results)
}

Copy the code

The number of workers is passed during initialization, corresponding to each G running work(CTX,& Wg,wp. Jobs,wp. Results), forming the worker-pool. At the same time, through sync.waitgroup, we can wait for the completion of all worker work, which means that work-pool work is finished. Of course, it may be because the task processing is finished, or it may be stopped.

How does each job data source come from?

// job data source, Func (wp WorkerPool) GenerateFrom(jobsBulk [] job) {for I, _ := range jobsBulk { wp.jobs <- jobsBulk[i] } close(wp.jobs) }Copy the code

Corresponding to the work of each worker,

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) { defer wg.Done() for { select { case job, ok := <-jobs: if ! ok { return } results <- job.execute(ctx) case <-ctx.Done(): fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err()) results <- Result{ Err: ctx.Err(), } return } } }Copy the code

Every worker tries to get data from the same job, which is a typical fan-out mode. When the corresponding G gets the job and processes it, it sends the processing result to the same Results channel, which is another Fan-in mode. Of course, we can stop each worker by using context. context.

And finally, the result set,

// wp WorkerPool Results() <-chan Result {return wp. Results}Copy the code

So the overall test code is:

func TestWorkerPool(t *testing.T) { wp := New(workerCount) ctx, cancel := context.WithCancel(context.TODO()) defer cancel() go wp.GenerateFrom(testJobs()) go wp.Run(ctx) for { select {  case r, ok := <-wp.Results(): if ! ok { continue } i, err := strconv.ParseInt(string(r.Descriptor.ID), 10, 64) if err ! = nil { t.Fatalf("unexpected error: %v", err) } val := r.Value.(int) if val ! = int(i)*2 { t.Fatalf("wrong value %v; expected %v", val, int(i)*2) } case <-wp.Done: return default: } } }Copy the code

After looking at the code, we know that this is not a traditional worker-pool. Instead of initializing a real worker-pool and attempting to retrieve a worker from the pool once a job is received, as in the Handling 1 Million Requests per Minute with Go article, The corresponding job is assigned to the work for processing. After the work is processed, the job is re-processed into the work pool, waiting for the next utilization.