The opening
I’ve written about it before, and it has a ringing name: Handling 1 Million Requests per Minute with Go. It was written by a foreign author, and I made a note of it. Start of also is this title, read quantity is my best, as expected the article is to rely on the title of…..
Today I came across another article (original at the end of the article). The principles of the two articles are similar: with a batch of jobs, the effect of concurrent processing of multiple workers can be achieved through the way of worker-pool.
They still have a lot of differences and implementations are quite different.
First of all, I put a picture in the last article, which is roughly the overall workflow of the last article.
- each
worker
Just finish the task, don’t care about the results, don’t deal with the results further. - As long as the request does not stop, the program will not stop, there is no control mechanism, unless the downtime.
The differences in this article are:
First of all, the data will come fromgenerate
(Production data)-> Concurrent processing data -> processing result aggregation. It looks something like this,
It can then use context.context to control the work pool to stop working.
Finally, through the code, you’ll see that it’s not a traditional worker-pool, as explained later.
The picture below can clearly express the overall process.
By the way, the code implemented in this article is much simpler than the Handling 1 Million Requests per Minute with Go code.
First, job.
package wpool
import (
"context"
)
type JobID string
type jobType string
type jobMetadata map[string]interface{}
type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error)
type JobDescriptor struct {
ID JobID
JType jobType
Metadata map[string]interface{}}type Result struct {
Value interface{}
Err error
Descriptor JobDescriptor
}
type Job struct {
Descriptor JobDescriptor
ExecFn ExecutionFn
Args interface{}}// Process the job logic and wrap the Result as Result
func (j Job) execute(ctx context.Context) Result {
value, err := j.ExecFn(ctx, j.Args)
iferr ! =nil {
return Result{
Err: err,
Descriptor: j.Descriptor,
}
}
return Result{
Value: value,
Descriptor: j.Descriptor,
}
}
Copy the code
This is a quick one. Finally, each job is returned as a Result.
This is the core code.
package wpool
import (
"context"
"fmt"
"sync"
)
// Each worker is running
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if! ok {return
}
results <- job.execute(ctx)
case <-ctx.Done():
fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
results <- Result{
Err: ctx.Err(),
}
return}}}type WorkerPool struct {
workersCount int / / number of the worker
jobs chan Job // Store the channel of the job
results chan Result // Finish processing the result set for each job
Done chan struct{} // Whether to end
}
func New(wcount int) WorkerPool {
return WorkerPool{
workersCount: wcount,
jobs: make(chan Job, wcount),
results: make(chan Result, wcount),
Done: make(chan struct{},}}func (wp WorkerPool) Run(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
go worker(ctx, &wg, wp.jobs, wp.results)
}
wg.Wait()
close(wp.Done)
close(wp.results)
}
func (wp WorkerPool) Results(a) <-chan Result {
return wp.results
}
func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
for i, _ := range jobsBulk {
wp.jobs <- jobsBulk[i]
}
close(wp.jobs)
}
Copy the code
The entire WorkerPool structure is simple. Jobs is a buffer channel. Each task is put into jobs for processing by Woker.
Results is also a channel type that holds the Result generated by each job process.
Start by initializing a worker-pool with New, and then Run.
func New(wcount int) WorkerPool {
return WorkerPool{
workersCount: wcount,
jobs: make(chan Job, wcount),
results: make(chan Result, wcount),
Done: make(chan struct{},}}func (wp WorkerPool) Run(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
go worker(ctx, &wg, wp.jobs, wp.results)
}
wg.Wait()
close(wp.Done)
close(wp.results)
}
Copy the code
When the number of workers is passed in during initialization, each G runs work(CTX, &WG,wp.jobs,wp.results) to form the worker-pool. At the same time, with sync.WaitGroup, we can wait for all workers to finish working, which means that the work pool is finished. Of course, it may be because the task processing is finished, or it may be stopped.
How does each job data source come from?
// Job data source to put each job into the Jobs Channel
func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
for i, _ := range jobsBulk {
wp.jobs <- jobsBulk[i]
}
close(wp.jobs)
}
Copy the code
Corresponding to the work of each worker,
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if! ok {return
}
results <- job.execute(ctx)
case <-ctx.Done():
fmt.Printf("cancelled worker. Error detail: %v\n", ctx.Err())
results <- Result{
Err: ctx.Err(),
}
return}}}Copy the code
Each worker tries to get data from the same Jobs, which is a typical fan-out pattern. When the corresponding G gets the job for processing, the processing result will be sent to the same Results channel, which is another fan-in mode. Of course, we can stop each worker through context. context.
And finally, to process the result set,
// Process the result set
func (wp WorkerPool) Results(a) <-chan Result {
return wp.results
}
Copy the code
So the overall test code is:
func TestWorkerPool(t *testing.T) {
wp := New(workerCount)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
go wp.GenerateFrom(testJobs())
go wp.Run(ctx)
for {
select {
case r, ok := <-wp.Results():
if! ok {continue
}
i, err := strconv.ParseInt(string(r.Descriptor.ID), 10.64)
iferr ! =nil {
t.Fatalf("unexpected error: %v", err)
}
val := r.Value.(int)
ifval ! =int(i)*2 {
t.Fatalf("wrong value %v; expected %v", val, int(i)*2)}case <-wp.Done:
return
default:}}}Copy the code
After looking at the code, we know that this is not a traditional worker-pool. It does not initialize a real worker-pool like the Handling 1 Million Requests per Minute with Go article. Once a job is received, it attempts to obtain a worker from the pool. The corresponding job is assigned to the work for processing. After the work is processed, the job is transferred to the work pool again for the next use.