preface

The purpose of creating this library is that Go’s existing library does not have features such as flexible definition of job runs and the reduction of small task submissions by batch submitting tasks. In the past, it was possible to create code based on the idea of Worker pools, but it was cumbersome to use, never generically creating a full library.

So go-Zero created the Executors.

introduce

In Go-Zero, Executors act as a task pool, do multi-task buffering, and use the tasks that do batch-processing. For example, ClickHouse Batch INSERT, SQL Batch INSERT. At the same time can also be on the go – the queue can see executors [in the queue is used ChunkExecutor, limited task submitted byte size 】.

So you can use this component when you have the following requirements:

  • Batch Submit tasks
  • Buffer part of the task, lazy commit
  • Delaying task submission

Before I explain, here’s a general overview:

Interface design

Under the Executors package, there are the following executors:

Name Margin value
bulkexecutor To achievemaxTasks[Maximum number of tasks] Submit
chunkexecutor To achievemaxChunkSize[Maximum number of bytes] Submit
periodicalexecutor basic executor
delayexecutor Delayed execution of incomingfn()
lessexecutor

You’ll see that the other three are executor + Container combinations, except for delay and less:

func NewBulkExecutor(execute Execute, opts ... BulkOption) *BulkExecutor {
  // Option mode: appears in multiple places in Go-Zero. In multi - configuration, a better design idea
  // https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
	options := newBulkOptions()
	for _, opt := range opts {
		opt(&options)
	}
  // 1. Task Container: [execute actual function] [maxTasks execution critical point]
	container := &bulkContainer{
		execute:  execute,
		maxTasks: options.cachedTasks,
	}
  // 2. We can see that BulkExecutor relies on PeriodicalExecutor at the bottom
	executor := &BulkExecutor{
		executor:  NewPeriodicalExecutor(options.flushInterval, container),
		container: container,
	}

	return executor
}
Copy the code

And the container is an interface:

TaskContainer interface {
  	// Add task to container
		AddTask(task interface{}) bool
    Execute func(); execute func();
		Execute(tasks interface{})
		// When the threshold is reached, remove all tasks from the container and pass them to execute func() through the channel
		RemoveAll() interface{}}Copy the code

Thus, the dependency between:

  • bulkexecutor:periodicalexecutor + bulkContainer
  • chunkexecutor:periodicalexecutor + chunkContainer

So if you want to complete your own executor, you can implement the three container interfaces and combine them with periodicexecutor

So going back to 👆, we’re going to focus on the PeriodicExecutor and see how it’s designed, right?

How to use

First look at how to use this component in your business:

There is a timing service that performs data synchronization from mysql to Clickhouse at a fixed time each day:

type DailyTask struct {
	ckGroup        *clickhousex.Cluster
	insertExecutor *executors.BulkExecutor
	mysqlConn      sqlx.SqlConn
}
Copy the code

Initialize bulkExecutor:

func (dts *DailyTask) Init(a) {
  // insertIntoCk() is the actual insert execution function.
	dts.insertExecutor = executors.NewBulkExecutor(
		dts.insertIntoCk,
		executors.WithBulkInterval(time.Second*3),	// The container will automatically flush tasks in the container for 3 seconds
		executors.WithBulkTasks(10240),							// Container Maximum number of tasks. It's usually a power of two)}Copy the code

As a bonus, ClickHouse is good for large volume inserts, because the inserts are fast, and large volume inserts make the most of ClickHouse

Main business logic writing:

func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
	for item := range ch {
		ifr, vok := item.(*model.Task); ! vok {continue
		}
		err := dts.insertExecutor.Add(r)
		iferr ! =nil {
			r.Tag = sqlFromDb.Tag
			r.TagId = sqlFromDb.Id
			r.InsertId = genInsertId()
			r.ToRedis = toRedis == constant.INCACHED
			r.UpdateWay = sqlFromDb.UpdateWay
      / / 1 ⃣ ️
			err := dts.insertExecutor.Add(r)
			iferr ! =nil {
				logx.Error(err)
			}
		}
	}
  / / 2 ⃣ ️
	dts.insertExecutor.Flush()
  / / 3 ⃣ ️
	dts.insertExecutor.Wait()
}
Copy the code

Why Flush(), Wait()

There are three steps in use:

  • Add(): add a task
  • Flush()Refresh:containerIn the task
  • Wait(): waits for all tasks to complete

Source code analysis

The focus here is on the Periodicexecutor, because the other two commonly used executors depend on it

Initialize the

func New.(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
	executor := &PeriodicalExecutor{
		commander:   make(chan interface{}, 1),
		interval:    interval,
		container:   container,
		confirmChan: make(chan lang.PlaceholderType),
		newTicker: func(d time.Duration) timex.Ticker {
			return timex.NewTicker(interval)
		},
	}
  ...
	return executor
}

Copy the code
  • commander: passtasksThe channel
  • containerTemporary storage:Add()The task of
  • confirmChanBlocked:Add(), at the beginning of thisexecuteTasks()It will release the block
  • ticker: Timer, preventAdd()When blocked, there is a chance to execute at a scheduled time, releasing the saved task in time

Add()

After initialization, the first step in the business logic is to add the task to the executor:

func (pe *PeriodicalExecutor) Add(task interface{}) {
	if vals, ok := pe.addAndCheck(task); ok {
		pe.commander <- vals
		<-pe.confirmChan
	}
}

func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
	pe.lock.Lock()
	defer func(a) {
    // Start with false
		var start bool
		if! pe.guarded {// backgroundFlush() will reset "guarded"
			pe.guarded = true
			start = true
		}
		pe.lock.Unlock()
    // backgroundFlush() in if is executed when the first task is added. Backend coroutine brush task
		if start {
			pe.backgroundFlush()
		}
	}()
	// Control maxTask, >=maxTask in container for tasks pop, return
	if pe.container.AddTask(task) {
		return pe.container.RemoveAll(), true
	}

	return nil.false
}
Copy the code

AddTask() in addAndCheck() controls the maximum number of tasks. If this is exceeded, run RemoveAll() and pass the container’s tasks pop to commander, followed by a goroutine loop to read. Then go to the tasks.

backgroundFlush()

Open a backend coroutine and refresh the task in container:

func (pe *PeriodicalExecutor) backgroundFlush(a) {
  Go func(){}
	threading.GoSafe(func(a) {
		ticker := pe.newTicker(pe.interval)
		defer ticker.Stop()

		var commanded bool
		last := timex.Now()
		for {
			select {
      // Get []tasks from channel
			case vals := <-pe.commander:
				commanded = true
        // Essence: WG.add (1)
				pe.enterExecution()
        // Release the block of Add(), and the staging area is empty. A new task is added
				pe.confirmChan <- lang.Placeholder
        // Execute the actual task logic
				pe.executeTasks(vals)
				last = timex.Now()
			case <-ticker.Chan():
				if commanded {
          // Because of the randomness of select, if both conditions are met at the same time, this process is reversed and the execution of this section is skipped
          // https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
					commanded = false
				} else if pe.Flush() {
          // The timer is reset when the refresh is complete. The staging area is empty. Start the next scheduled refresh
					last = timex.Now()
				} else if timex.Since(last) > pe.interval*idleRound {
          // If maxTask, Flush() err is not reached and last->now is too long, Flush() will be triggered again.
          // Only when this is reversed will a new backgroundFlush() coroutine be opened
          pe.guarded = false
					// Refresh again to prevent omissions
					pe.Flush()
					return}}}})}Copy the code

Overall two processes:

  • commanderTo receiveRemoveAll()Pass in the tasks, then do the execution, and releaseAdd()To continueAdd()
  • tickerWhen the time is up, if the first step is not executed, then automaticallyFlush(), also do task execution

Wait()

In backgroundFlush(), mention a function: enterExecution() :

func (pe *PeriodicalExecutor) enterExecution(a) {
	pe.wgBarrier.Guard(func(a) {
		pe.waitGroup.Add(1)})}func (pe *PeriodicalExecutor) Wait(a) {
	pe.wgBarrier.Guard(func(a) {
		pe.waitGroup.Wait()
	})
}
Copy the code

Dts.insertexecutor.wait () at the end of the list, waiting for all goroutine tasks to complete.

thinking

In looking at the source code, thinking about some other design ideas, do you have a similar problem:

  • Based on the analysis ofexecutorsYou’ll find it in a lot of placeslock

Go test has race. Use locking to avoid this

  • Based on the analysis ofconfirmChanFound in thissubmitIt just came out. Why is that?

Previously: Wg.add (1) is written in executeTasks(); Now: first wg.add (1), then unblock confirmChan

If executor func blocks, the Add task is still running, and because it is not blocked, it may soon execute executor.Wait(), in which case wg.wait () will execute before Wg.add (), causing a panic

See the latest edition of TestPeriodicalExecutor_WaitFast(), which can be replicated if you run on it

conclusion

The remaining analysis of a few executors, left to you to see the source code.

In short, overall design:

  • Follow interface oriented design
  • Flexible use ofchannelwaitgroupEqual-concurrency tools
  • Combination of execution unit and storage unit

There are also many useful component tools in Go-Zero, and good use of them can greatly improve service performance and development efficiency. I hope this article will bring some benefits to you.

The project address

If you think the article is good, please go to Github and click star 🤝.

Also welcome to go- Zero, github.com/tal-tech/go…