Another simple use of Golang func, and I have to say this grammar candy is really good.
The essence of concurrent task consumption is a multi-spin Goroutine that listens for multiple Chan types of funC and executes it when it receives a funC from the CHAN.
The core code
type Work func(a)// redefinefuncThe type ofvar workChan []chan Work// To receive workchan
var wg = &sync.WaitGroup{}
func Start(size int) {// size specifies the number of spin goroutines
for i := 0; i < size; i++ {
workChan[i] = make(chan Work)
go process(i)
}
}
func process(i int) {
wg.Add(1)
defer wg.Done()
if works, ok := workChan[i]; ok {
for {
work, ok := <-works// Receive work
if ok {
work()// Execute the work}}}}Copy the code
By defining a Chan Work of slice type, multiple Goroutines are used to receive chan Work and execute it upon receipt. Also use sync.waitgroup to guarantee unlimited spin of the Goroutine.
Add the work
Sending work externally is actually the logic of sending work to chan. In order to provide better use for the external, it is simply encapsulated as follows
type Work func(a)
type workerGroup struct {
workChan map[int]chan Work
*sync.WaitGroup
isStop bool
}
func NewWorkerGroup(a) *workerGroup {
wg := &workerGroup{WaitGroup: &sync.WaitGroup{}}
wg.workChan = make(map[int]chan Work)
return wg
}
// Send the task
func (w *workerGroup) SendWork(work Work, i int) error {
if! w.isStop {if works, ok := w.workChan[i]; ok {
works <- work
return nil
} else {
return errors.New("error i")}}else{
return errors.New("the worker group has been closed")}}func (w *workerGroup) process(i int) {
w.Add(1)
defer w.Done()
if works, ok := w.workChan[i]; ok {
for {
work, ok := <-works
if ok {
work()
}
}
}
}
func (w *workerGroup) Start(size int) {
for i := 0; i < size; i++ {
w.workChan[i] = make(chan Work)
go w.process(i)
}
}
// The listener exits
func (w *workerGroup) OnStop(a) {
go func(a) {
for {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
select {
case <-sig:
w.isStop = true
func(a) {
for _, works := range w.workChan {
if len(works) <=0 {
close(works)
}
}
}()
}
break
}
log.Println("worker group succeed to close")
os.Exit(1)
}()
w.Wait()
}
Copy the code
When external code needs to use this module, just create a workgroup, turn it on, and call sendwork to sendwork, as shown below
func main(a) {
wg := NewWorkerGroup()
wg.Start(6)
tick := time.Tick(time.Second)
wg.OnStop()
wg.SendWork(func(a) {
fmt.Println("work")},5)}Copy the code
conclusion
The above code is simple to use and does not take into account many exceptions. For example, in SendWork, if there is already a value in chan, SendWork will be blocked. Second, if the system exits, unfinished work in Goroutine exits as well. In addition, if the work has a return value, the results need to be notified, whether to return the results directly or asynchronously by notifying the callback function is an open question. There are more questions, welcome to pat brick correction.