sequence

This article focuses on gost’s GenericTaskPool

GenericTaskPool

gost/sync/task_pool.go

// GenericTaskPool represents an generic task pool. type GenericTaskPool interface { // AddTask wait idle worker add task AddTask(t task) bool // AddTaskAlways add task to queues or do it immediately AddTaskAlways(t task) // AddTaskBalance add task to idle queue AddTaskBalance(t task) // Close use to close the task pool Close() // IsClosed use  to check pool status. IsClosed() bool }Copy the code

The GenericTaskPool interface defines AddTask, AddTaskAlways, AddTaskBalance, Close, and IsClosed interfaces

TaskPool

gost/sync/task_pool.go

type TaskPool struct {
	TaskPoolOptions

	idx    uint32 // round robin index
	qArray []chan task
	wg     sync.WaitGroup

	once sync.Once
	done chan struct{}
}

// return false when the pool is stop
func (p *TaskPool) AddTask(t task) (ok bool) {
	idx := atomic.AddUint32(&p.idx, 1)
	id := idx % uint32(p.tQNumber)

	select {
	case <-p.done:
		return false
	default:
		p.qArray[id] <- t
		return true
	}
}

func (p *TaskPool) AddTaskAlways(t task) {
	id := atomic.AddUint32(&p.idx, 1) % uint32(p.tQNumber)

	select {
	case p.qArray[id] <- t:
		return
	default:
		goSafely(t)
	}
}

// do it immediately when no idle queue
func (p *TaskPool) AddTaskBalance(t task) {
	length := len(p.qArray)

	// try len/2 times to lookup idle queue
	for i := 0; i < length/2; i++ {
		select {
		case p.qArray[rand.Intn(length)] <- t:
			return
		default:
			continue
		}
	}

	goSafely(t)
}

// check whether the session has been closed.
func (p *TaskPool) IsClosed() bool {
	select {
	case <-p.done:
		return true

	default:
		return false
	}
}

func (p *TaskPool) Close() {
	p.stop()
	p.wg.Wait()
	for i := range p.qArray {
		close(p.qArray[i])
	}
}
Copy the code

TaskPool specifies TaskPoolOptions, IDX, qArray, WG, once, done. It implements the GenericTaskPool interface; The AddTask method returns false if the pool is done, and increments idx if the pool is not done, and then computs the ID based on tQNumber to qArray[id]. The AddTaskAlways method ignores the pool closure message. The AddTaskBalance method tries len/2 times to write a task randomly to the qArray. If both attempts fail, goSafely executes. IsClosed reads the channel information of done. The Close method performs stop and WG.wait (), and finally executes Close through qArray one by one

NewTaskPool

gost/sync/task_pool.go

func NewTaskPool(opts ... TaskPoolOption) GenericTaskPool { var tOpts TaskPoolOptions for _, opt := range opts { opt(&tOpts) } tOpts.validate() p := &TaskPool{ TaskPoolOptions: tOpts, qArray: make([]chan task, tOpts.tQNumber), done: make(chan struct{}), } for i := 0; i < p.tQNumber; i++ { p.qArray[i] = make(chan task, p.tQLen) } p.start() return p }Copy the code

NewTaskPool Creates a TaskPool using TaskPoolOptions

TaskPoolOption

gost/sync/options.go

const (
	defaultTaskQNumber = 10
	defaultTaskQLen    = 128
)

/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////

// TaskPoolOptions is optional settings for task pool
type TaskPoolOptions struct {
	tQLen      int // task queue length. buffer size per queue
	tQNumber   int // task queue number. number of queue
	tQPoolSize int // task pool size. number of workers
}

func (o *TaskPoolOptions) validate() {
	if o.tQPoolSize < 1 {
		panic(fmt.Sprintf("illegal pool size %d", o.tQPoolSize))
	}

	if o.tQLen < 1 {
		o.tQLen = defaultTaskQLen
	}

	if o.tQNumber < 1 {
		o.tQNumber = defaultTaskQNumber
	}

	if o.tQNumber > o.tQPoolSize {
		o.tQNumber = o.tQPoolSize
	}
}

type TaskPoolOption func(*TaskPoolOptions)

// WithTaskPoolTaskPoolSize set @size of the task queue pool size
func WithTaskPoolTaskPoolSize(size int) TaskPoolOption {
	return func(o *TaskPoolOptions) {
		o.tQPoolSize = size
	}
}

// WithTaskPoolTaskQueueLength set @length of the task queue length
func WithTaskPoolTaskQueueLength(length int) TaskPoolOption {
	return func(o *TaskPoolOptions) {
		o.tQLen = length
	}
}

// WithTaskPoolTaskQueueNumber set @number of the task queue number
func WithTaskPoolTaskQueueNumber(number int) TaskPoolOption {
	return func(o *TaskPoolOptions) {
		o.tQNumber = number
	}
}
Copy the code

TaskPoolOptions defines the tQLen, tQNumber, and tQPoolSize attributes. Provides WithTaskPoolTaskPoolSize, WithTaskPoolTaskQueueLength, WithTaskPoolTaskQueueNumber, validate method

start

gost/sync/task_pool.go

func (p *TaskPool) start() { for i := 0; i < p.tQPoolSize; i++ { p.wg.Add(1) workerID := i q := p.qArray[workerID%p.tQNumber] p.safeRun(workerID, q) } } func (p *TaskPool) safeRun(workerID int, q chan task) { gxruntime.GoSafely(nil, false, func() { err := p.run(int(workerID), q) if err ! = nil { // log error to stderr log.Printf("gost/TaskPool.run error: %s", err.Error()) } }, nil, ) }Copy the code

The start method executes safeRun one by one according to tQPoolSize; SafeRun method executed by GoSafely p.run(int(workerID), q)

run

gost/sync/task_pool.go

// worker func (p *TaskPool) run(id int, q chan task) error { defer p.wg.Done() var ( ok bool t task ) for { select { case <-p.done: if 0 < len(q) { return fmt.Errorf("task worker %d exit now while its task buffer length %d is greater than 0", id, len(q)) } return nil case t, ok = <-q: if ok { func() { defer func() { if r := recover(); r ! = nil { fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n", time.Now(), r, string(debug.Stack())) } }() t() }() } } } }Copy the code

The run method uses the for loop to select, and exits the loop if it reads p.done. If a task is read, the task is executed

taskPoolSimple

gost/sync/task_pool.go

type taskPoolSimple struct {
	work chan task     // task channel
	sem  chan struct{} // gr pool size

	wg sync.WaitGroup

	once sync.Once
	done chan struct{}
}
Copy the code

TaskPoolSimple defines the work, SEM, WG, once, done attributes. It implements the GenericTaskPool interface; The AddTask method determines done first, and then writes work and SEM. The AddTaskAlways method executes the goSafely command if the select is not channel; The AddTaskBalance method actually executes AddTaskAlways; The IsClosed method reads the done information. Close: stop and Wg.wait ()

The instance

gost/sync/task_pool_test.go

func TestTaskPool(t *testing.T) { numCPU := runtime.NumCPU() //taskCnt := int64(numCPU * numCPU * 100) tp := NewTaskPool( WithTaskPoolTaskPoolSize(1), WithTaskPoolTaskQueueNumber(1), WithTaskPoolTaskQueueLength(1), ) //task, cnt := newCountTask() task, _ := newCountTask() var wg sync.WaitGroup for i := 0; i < numCPU*numCPU; i++ { wg.Add(1) go func() { for j := 0; j < 100; j++ { ok := tp.AddTask(task) if ! ok { t.Log(j) } } wg.Done() }() } wg.Wait() tp.Close() //if taskCnt ! = atomic.LoadInt64(cnt) { // //t.Error("want ", taskCnt, " got ", *cnt) //} } func TestTaskPoolSimple(t *testing.T) { numCPU := runtime.NumCPU() taskCnt := int64(numCPU * numCPU * 100) tp := NewTaskPoolSimple(1) task, cnt := newCountTask() var wg sync.WaitGroup for i := 0; i < numCPU*numCPU; i++ { wg.Add(1) go func() { for j := 0; j < 100; j++ { ok := tp.AddTask(task) if ! ok { t.Log(j) } } wg.Done() }() } wg.Wait() cntValue := atomic.LoadInt64(cnt) if taskCnt ! = cntValue { t.Error("want ", taskCnt, " got ", cntValue) } }Copy the code

TaskPoolSimple is easy to create by providing the size parameter. TaskPool create the need for TaskPoolOption with WithTaskPoolTaskPoolSize, WithTaskPoolTaskQueueNumber, WithTaskPoolTaskQueueLength these option

summary

The GenericTaskPool interface of GOST defines AddTask, AddTaskAlways, AddTaskBalance, Close, IsClosed interfaces. There are two implementations of TaskPool and taskPoolSimple.

doc

  • gost