From: juejin. Cn/post / 699169…

Code section

// ExecutionFn Method of job execution
type ExecutionFn func(ctx context.Context, args interface{}) (interface{}, error)
Copy the code
// Job
type Job struct {
   Descriptor JobDescriptor
   ExecFn     ExecutionFn
   Args       interface{}}Copy the code
/ / the Result
type Result struct {
   Value      interface{}
   Err        error
   Descriptor JobDescriptor
}
Copy the code
// execute returns the result of processing work
func (j Job) execute(ctx context.Context) Result {
   value, err := j.ExecFn(ctx, j.Args)
   / / make a mistake
   iferr ! =nil {
      return Result{
         Err:        err,
         Descriptor: j.Descriptor,
      }
   }
   return Result{
      Value:      value,
      Descriptor: j.Descriptor,
   }
}
Copy the code
// worker executes the task
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
   defer wg.Done()
   for {
      select {
      case job, ok := <-jobs:
         if! ok {return
         }
         results <- job.execute(ctx)
         //log.Println(job.Descriptor.ID)
      case <-ctx.Done():
         log.Printf("cancel worker,error detail: %v \n", ctx.Err())
         results <- Result{
            Err: ctx.Err(),
         }
         return}}}Copy the code
type WorkPool struct {
   workersCount int           / / number of the worker
   jobs         chan Job      / / working set
   results      chan Result   / / the result set
   Done         chan struct{} // End flag
}
Copy the code
func New(wc int) WorkPool {
   return WorkPool{
      workersCount: wc,
      jobs:         make(chan Job, wc),
      results:      make(chan Result, wc),
      Done:         make(chan struct{},}}Copy the code
/ / the Run Run
func (wp WorkPool) Run(ctx context.Context) {
   var wg sync.WaitGroup
   for i := 0; i < wp.workersCount; i++ {
      wg.Add(1)
      go worker(ctx, &wg, wp.jobs, wp.results)
   }
   wg.Wait()
   close(wp.Done)
   close(wp.results)
}
Copy the code
// ShutDown Shuts down the working pool
func (wp WorkPool) ShutDown(a) {
   wp.Done <- struct{} {}// Resource release
   defer close(wp.Done)
   defer close(wp.results)
   defer close(wp.jobs)
}
Copy the code
// Results Returns the result
func (wp WorkPool) Results(a) <-chan Result {
   return wp.results
}
Copy the code
// GenerateFromSlice generates a Job
func (wp WorkPool) GenerateFromSlice(jobsBulk []Job) {
   for i, _ := range jobsBulk {
      wp.jobs <- jobsBulk[i]
   }
   //close(wp.jobs)
   return
}
Copy the code
func (wp WorkPool) AddJob(job Job) {
   wp.jobs <- job
   return
}
Copy the code

Test to print out the result set

func TestWorkPool(t *testing.T) {
   wp := New(10)
   ctx, cancel := context.WithCancel(context.TODO())
   defer cancel()

   go wp.GenerateFromSlice(testJob())
   //go wp.GenerateFromSlice(testJob())
   //go wp.GenerateFromSlice(testJob())
   go wp.AddJob(generateJob())

   go wp.Run(ctx)

   // Close in 10 seconds
   go func(a) {
      time.AfterFunc(10*time.Second, wp.ShutDown)
   }()
   for {
      select {
      case r, ok := <-wp.Results():
         if! ok {continue
         }
         fmt.Println(r.Value)
      case <-wp.Done:
         return
      default:}}}Copy the code