Another simple use of Golang func, and I have to say this grammar candy is really good.

The essence of concurrent task consumption is a multi-spin Goroutine that listens for multiple Chan types of funC and executes it when it receives a funC from the CHAN.

The core code

type Work func(a)// redefinefuncThe type ofvar workChan []chan Work// To receive workchan
var wg = &sync.WaitGroup{}

func Start(size int) {// size specifies the number of spin goroutines
	for i := 0; i < size; i++ {
		workChan[i] = make(chan Work)
		go process(i)
	}
}
func process(i int) {
	wg.Add(1)
	defer wg.Done()
	if works, ok := workChan[i]; ok {
		for {
			work, ok := <-works// Receive work
			if ok {
				work()// Execute the work}}}}Copy the code

By defining a Chan Work of slice type, multiple Goroutines are used to receive chan Work and execute it upon receipt. Also use sync.waitgroup to guarantee unlimited spin of the Goroutine.

Add the work

Sending work externally is actually the logic of sending work to chan. In order to provide better use for the external, it is simply encapsulated as follows

type Work func(a)

type workerGroup struct {
	workChan map[int]chan Work
	*sync.WaitGroup
	isStop bool
}

func NewWorkerGroup(a) *workerGroup {
	wg := &workerGroup{WaitGroup: &sync.WaitGroup{}}
	wg.workChan = make(map[int]chan Work)
	return wg
}
// Send the task
func (w *workerGroup) SendWork(work Work, i int) error {
	if! w.isStop {if works, ok := w.workChan[i]; ok {
			works <- work
			return nil
		} else {
			return errors.New("error i")}}else{
		return errors.New("the worker group has been closed")}}func (w *workerGroup) process(i int) {
	w.Add(1)
	defer w.Done()
	if works, ok := w.workChan[i]; ok {
		for {
			work, ok := <-works
			if ok {
				work()
			}
		}
	}
}
func (w *workerGroup) Start(size int) {
	for i := 0; i < size; i++ {
		w.workChan[i] = make(chan Work)
		go w.process(i)
	}
}
// The listener exits
func (w *workerGroup) OnStop(a) {
	go func(a) {
		for {
			sig := make(chan os.Signal, 1)
			signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
			select {
			case <-sig:
				w.isStop = true
				func(a) {
					for _, works := range w.workChan {
						if len(works) <=0 {
							close(works)
						}
					}
				}()
			}
			break
		}
		log.Println("worker group succeed to close")
		os.Exit(1)
	}()
	w.Wait()
}
Copy the code

When external code needs to use this module, just create a workgroup, turn it on, and call sendwork to sendwork, as shown below

func main(a) {
	wg := NewWorkerGroup()
	wg.Start(6)
	tick := time.Tick(time.Second)
	wg.OnStop()
	wg.SendWork(func(a) {
		fmt.Println("work")},5)}Copy the code

conclusion

The above code is simple to use and does not take into account many exceptions. For example, in SendWork, if there is already a value in chan, SendWork will be blocked. Second, if the system exits, unfinished work in Goroutine exits as well. In addition, if the work has a return value, the results need to be notified, whether to return the results directly or asynchronously by notifying the callback function is an open question. There are more questions, welcome to pat brick correction.