sequence
This article focuses on gost’s GenericTaskPool
GenericTaskPool
gost/sync/task_pool.go
// GenericTaskPool represents an generic task pool. type GenericTaskPool interface { // AddTask wait idle worker add task AddTask(t task) bool // AddTaskAlways add task to queues or do it immediately AddTaskAlways(t task) // AddTaskBalance add task to idle queue AddTaskBalance(t task) // Close use to close the task pool Close() // IsClosed use to check pool status. IsClosed() bool }Copy the code
The GenericTaskPool interface defines AddTask, AddTaskAlways, AddTaskBalance, Close, and IsClosed interfaces
TaskPool
gost/sync/task_pool.go
type TaskPool struct {
TaskPoolOptions
idx uint32 // round robin index
qArray []chan task
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {
idx := atomic.AddUint32(&p.idx, 1)
id := idx % uint32(p.tQNumber)
select {
case <-p.done:
return false
default:
p.qArray[id] <- t
return true
}
}
func (p *TaskPool) AddTaskAlways(t task) {
id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)
select {
case p.qArray[id] <- t:
return
default:
goSafely(t)
}
}
// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {
length := len(p.qArray)
// try len/2 times to lookup idle queue
for i := 0; i < length/2; i++ {
select {
case p.qArray[rand.Intn(length)] <- t:
return
default:
continue
}
}
goSafely(t)
}
// check whether the session has been closed.
func (p *TaskPool) IsClosed() bool {
select {
case <-p.done:
return true
default:
return false
}
}
func (p *TaskPool) Close() {
p.stop()
p.wg.Wait()
for i := range p.qArray {
close(p.qArray[i])
}
}
Copy the code
TaskPool specifies TaskPoolOptions, IDX, qArray, WG, once, done. It implements the GenericTaskPool interface; The AddTask method returns false if the pool is done, and increments idx if the pool is not done, and then computs the ID based on tQNumber to qArray[id]. The AddTaskAlways method ignores the pool closure message. The AddTaskBalance method tries len/2 times to write a task randomly to the qArray. If both attempts fail, goSafely executes. IsClosed reads the channel information of done. The Close method performs stop and WG.wait (), and finally executes Close through qArray one by one
NewTaskPool
gost/sync/task_pool.go
func NewTaskPool(opts ... TaskPoolOption) GenericTaskPool { var tOpts TaskPoolOptions for _, opt := range opts { opt(&tOpts) } tOpts.validate() p := &TaskPool{ TaskPoolOptions: tOpts, qArray: make([]chan task, tOpts.tQNumber), done: make(chan struct{}), } for i := 0; i < p.tQNumber; i++ { p.qArray[i] = make(chan task, p.tQLen) } p.start() return p }Copy the code
NewTaskPool Creates a TaskPool using TaskPoolOptions
TaskPoolOption
gost/sync/options.go
const (
defaultTaskQNumber = 10
defaultTaskQLen = 128
)
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////
// TaskPoolOptions is optional settings for task pool
type TaskPoolOptions struct {
tQLen int // task queue length. buffer size per queue
tQNumber int // task queue number. number of queue
tQPoolSize int // task pool size. number of workers
}
func (o *TaskPoolOptions) validate() {
if o.tQPoolSize < 1 {
panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
}
if o.tQLen < 1 {
o.tQLen = defaultTaskQLen
}
if o.tQNumber < 1 {
o.tQNumber = defaultTaskQNumber
}
if o.tQNumber > o.tQPoolSize {
o.tQNumber = o.tQPoolSize
}
}
type TaskPoolOption func(*TaskPoolOptions)
// WithTaskPoolTaskPoolSize set @size of the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQPoolSize = size
}
}
// WithTaskPoolTaskQueueLength set @length of the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQLen = length
}
}
// WithTaskPoolTaskQueueNumber set @number of the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
return func(o *TaskPoolOptions) {
o.tQNumber = number
}
}
Copy the code
TaskPoolOptions defines the tQLen, tQNumber, and tQPoolSize attributes. Provides WithTaskPoolTaskPoolSize, WithTaskPoolTaskQueueLength, WithTaskPoolTaskQueueNumber, validate method
start
gost/sync/task_pool.go
func (p *TaskPool) start() { for i := 0; i < p.tQPoolSize; i++ { p.wg.Add(1) workerID := i q := p.qArray[workerID%p.tQNumber] p.safeRun(workerID, q) } } func (p *TaskPool) safeRun(workerID int, q chan task) { gxruntime.GoSafely(nil, false, func() { err := p.run(int(workerID), q) if err ! = nil { // log error to stderr log.Printf("gost/TaskPool.run error: %s", err.Error()) } }, nil, ) }Copy the code
The start method executes safeRun one by one according to tQPoolSize; SafeRun method executed by GoSafely p.run(int(workerID), q)
run
gost/sync/task_pool.go
// worker func (p *TaskPool) run(id int, q chan task) error { defer p.wg.Done() var ( ok bool t task ) for { select { case <-p.done: if 0 < len(q) { return fmt.Errorf("task worker %d exit now while its task buffer length %d is greater than 0", id, len(q)) } return nil case t, ok = <-q: if ok { func() { defer func() { if r := recover(); r ! = nil { fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n", time.Now(), r, string(debug.Stack())) } }() t() }() } } } }Copy the code
The run method uses the for loop to select, and exits the loop if it reads p.done. If a task is read, the task is executed
taskPoolSimple
gost/sync/task_pool.go
type taskPoolSimple struct {
work chan task // task channel
sem chan struct{} // gr pool size
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
Copy the code
TaskPoolSimple defines the work, SEM, WG, once, done attributes. It implements the GenericTaskPool interface; The AddTask method determines done first, and then writes work and SEM. The AddTaskAlways method executes the goSafely command if the select is not channel; The AddTaskBalance method actually executes AddTaskAlways; The IsClosed method reads the done information. Close: stop and Wg.wait ()
The instance
gost/sync/task_pool_test.go
func TestTaskPool(t *testing.T) { numCPU := runtime.NumCPU() //taskCnt := int64(numCPU * numCPU * 100) tp := NewTaskPool( WithTaskPoolTaskPoolSize(1), WithTaskPoolTaskQueueNumber(1), WithTaskPoolTaskQueueLength(1), ) //task, cnt := newCountTask() task, _ := newCountTask() var wg sync.WaitGroup for i := 0; i < numCPU*numCPU; i++ { wg.Add(1) go func() { for j := 0; j < 100; j++ { ok := tp.AddTask(task) if ! ok { t.Log(j) } } wg.Done() }() } wg.Wait() tp.Close() //if taskCnt ! = atomic.LoadInt64(cnt) { // //t.Error("want ", taskCnt, " got ", *cnt) //} } func TestTaskPoolSimple(t *testing.T) { numCPU := runtime.NumCPU() taskCnt := int64(numCPU * numCPU * 100) tp := NewTaskPoolSimple(1) task, cnt := newCountTask() var wg sync.WaitGroup for i := 0; i < numCPU*numCPU; i++ { wg.Add(1) go func() { for j := 0; j < 100; j++ { ok := tp.AddTask(task) if ! ok { t.Log(j) } } wg.Done() }() } wg.Wait() cntValue := atomic.LoadInt64(cnt) if taskCnt ! = cntValue { t.Error("want ", taskCnt, " got ", cntValue) } }Copy the code
TaskPoolSimple is easy to create by providing the size parameter. TaskPool create the need for TaskPoolOption with WithTaskPoolTaskPoolSize, WithTaskPoolTaskQueueNumber, WithTaskPoolTaskQueueLength these option
summary
The GenericTaskPool interface of GOST defines AddTask, AddTaskAlways, AddTaskBalance, Close, IsClosed interfaces. There are two implementations of TaskPool and taskPoolSimple.
doc
- gost