From: juejin. Cn/post / 699169…
Code section
// ExecutionFn Method of job execution
type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error)
Copy the code
// Job
type Job struct {
Descriptor JobDescriptor
ExecFn ExecutionFn
Args interface{}}Copy the code
/ / the Result
type Result struct {
Value interface{}
Err error
Descriptor JobDescriptor
}
Copy the code
// execute returns the result of processing work
func (j Job) execute(ctx context.Context) Result {
value, err := j.ExecFn(ctx, j.Args)
/ / make a mistake
iferr ! =nil {
return Result{
Err: err,
Descriptor: j.Descriptor,
}
}
return Result{
Value: value,
Descriptor: j.Descriptor,
}
}
Copy the code
// worker executes the task
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if! ok {return
}
results <- job.execute(ctx)
//log.Println(job.Descriptor.ID)
case <-ctx.Done():
log.Printf("cancel worker,error detail: %v \n", ctx.Err())
results <- Result{
Err: ctx.Err(),
}
return}}}Copy the code
type WorkPool struct {
workersCount int / / number of the worker
jobs chan Job / / working set
results chan Result / / the result set
Done chan struct{} // End flag
}
Copy the code
func New(wc int) WorkPool {
return WorkPool{
workersCount: wc,
jobs: make(chan Job, wc),
results: make(chan Result, wc),
Done: make(chan struct{},}}Copy the code
/ / the Run Run
func (wp WorkPool) Run(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
go worker(ctx, &wg, wp.jobs, wp.results)
}
wg.Wait()
close(wp.Done)
close(wp.results)
}
Copy the code
// ShutDown Shuts down the working pool
func (wp WorkPool) ShutDown(a) {
wp.Done <- struct{} {}// Resource release
defer close(wp.Done)
defer close(wp.results)
defer close(wp.jobs)
}
Copy the code
// Results Returns the result
func (wp WorkPool) Results(a) <-chan Result {
return wp.results
}
Copy the code
// GenerateFromSlice generates a Job
func (wp WorkPool) GenerateFromSlice(jobsBulk []Job) {
for i, _ := range jobsBulk {
wp.jobs <- jobsBulk[i]
}
//close(wp.jobs)
return
}
Copy the code
func (wp WorkPool) AddJob(job Job) {
wp.jobs <- job
return
}
Copy the code
Test to print out the result set
func TestWorkPool(t *testing.T) {
wp := New(10)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
go wp.GenerateFromSlice(testJob())
//go wp.GenerateFromSlice(testJob())
//go wp.GenerateFromSlice(testJob())
go wp.AddJob(generateJob())
go wp.Run(ctx)
// Close in 10 seconds
go func(a) {
time.AfterFunc(10*time.Second, wp.ShutDown)
}()
for {
select {
case r, ok := <-wp.Results():
if! ok {continue
}
fmt.Println(r.Value)
case <-wp.Done:
return
default:}}}Copy the code