preface
The purpose of creating this library is that Go’s existing library does not have features such as flexible definition of job runs and the reduction of small task submissions by batch submitting tasks. In the past, it was possible to create code based on the idea of Worker pools, but it was cumbersome to use, never generically creating a full library.
So go-Zero created the Executors.
introduce
In Go-Zero, Executors act as a task pool, do multi-task buffering, and use the tasks that do batch-processing. For example, ClickHouse Batch INSERT, SQL Batch INSERT. At the same time can also be on the go – the queue can see executors [in the queue is used ChunkExecutor, limited task submitted byte size 】.
So you can use this component when you have the following requirements:
- Batch Submit tasks
- Buffer part of the task, lazy commit
- Delaying task submission
Before I explain, here’s a general overview:
Interface design
Under the Executors package, there are the following executors:
Name | Margin value |
---|---|
bulkexecutor |
To achievemaxTasks [Maximum number of tasks] Submit |
chunkexecutor |
To achievemaxChunkSize [Maximum number of bytes] Submit |
periodicalexecutor |
basic executor |
delayexecutor |
Delayed execution of incomingfn() |
lessexecutor |
You’ll see that the other three are executor + Container combinations, except for delay and less:
func NewBulkExecutor(execute Execute, opts ... BulkOption) *BulkExecutor {
// Option mode: appears in multiple places in Go-Zero. In multi - configuration, a better design idea
// https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
options := newBulkOptions()
for _, opt := range opts {
opt(&options)
}
// 1. Task Container: [execute actual function] [maxTasks execution critical point]
container := &bulkContainer{
execute: execute,
maxTasks: options.cachedTasks,
}
// 2. We can see that BulkExecutor relies on PeriodicalExecutor at the bottom
executor := &BulkExecutor{
executor: NewPeriodicalExecutor(options.flushInterval, container),
container: container,
}
return executor
}
Copy the code
And the container is an interface:
TaskContainer interface {
// Add task to container
AddTask(task interface{}) bool
Execute func(); execute func();
Execute(tasks interface{})
// When the threshold is reached, remove all tasks from the container and pass them to execute func() through the channel
RemoveAll() interface{}}Copy the code
Thus, the dependency between:
bulkexecutor
:periodicalexecutor
+bulkContainer
chunkexecutor
:periodicalexecutor
+chunkContainer
So if you want to complete your own executor, you can implement the three container interfaces and combine them with periodicexecutor
So going back to 👆, we’re going to focus on the PeriodicExecutor and see how it’s designed, right?
How to use
First look at how to use this component in your business:
There is a timing service that performs data synchronization from mysql to Clickhouse at a fixed time each day:
type DailyTask struct {
ckGroup *clickhousex.Cluster
insertExecutor *executors.BulkExecutor
mysqlConn sqlx.SqlConn
}
Copy the code
Initialize bulkExecutor:
func (dts *DailyTask) Init(a) {
// insertIntoCk() is the actual insert execution function.
dts.insertExecutor = executors.NewBulkExecutor(
dts.insertIntoCk,
executors.WithBulkInterval(time.Second*3), // The container will automatically flush tasks in the container for 3 seconds
executors.WithBulkTasks(10240), // Container Maximum number of tasks. It's usually a power of two)}Copy the code
As a bonus, ClickHouse is good for large volume inserts, because the inserts are fast, and large volume inserts make the most of ClickHouse
Main business logic writing:
func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
for item := range ch {
ifr, vok := item.(*model.Task); ! vok {continue
}
err := dts.insertExecutor.Add(r)
iferr ! =nil {
r.Tag = sqlFromDb.Tag
r.TagId = sqlFromDb.Id
r.InsertId = genInsertId()
r.ToRedis = toRedis == constant.INCACHED
r.UpdateWay = sqlFromDb.UpdateWay
/ / 1 ⃣ ️
err := dts.insertExecutor.Add(r)
iferr ! =nil {
logx.Error(err)
}
}
}
/ / 2 ⃣ ️
dts.insertExecutor.Flush()
/ / 3 ⃣ ️
dts.insertExecutor.Wait()
}
Copy the code
Why Flush(), Wait()
There are three steps in use:
Add()
: add a taskFlush()
Refresh:container
In the taskWait()
: waits for all tasks to complete
Source code analysis
The focus here is on the Periodicexecutor, because the other two commonly used executors depend on it
Initialize the
func New.(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
executor := &PeriodicalExecutor{
commander: make(chan interface{}, 1),
interval: interval,
container: container,
confirmChan: make(chan lang.PlaceholderType),
newTicker: func(d time.Duration) timex.Ticker {
return timex.NewTicker(interval)
},
}
...
return executor
}
Copy the code
commander
: passtasks
The channelcontainer
Temporary storage:Add()
The task ofconfirmChan
Blocked:Add()
, at the beginning of thisexecuteTasks()
It will release the blockticker
: Timer, preventAdd()
When blocked, there is a chance to execute at a scheduled time, releasing the saved task in time
Add()
After initialization, the first step in the business logic is to add the task to the executor:
func (pe *PeriodicalExecutor) Add(task interface{}) {
if vals, ok := pe.addAndCheck(task); ok {
pe.commander <- vals
<-pe.confirmChan
}
}
func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
pe.lock.Lock()
defer func(a) {
// Start with false
var start bool
if! pe.guarded {// backgroundFlush() will reset "guarded"
pe.guarded = true
start = true
}
pe.lock.Unlock()
// backgroundFlush() in if is executed when the first task is added. Backend coroutine brush task
if start {
pe.backgroundFlush()
}
}()
// Control maxTask, >=maxTask in container for tasks pop, return
if pe.container.AddTask(task) {
return pe.container.RemoveAll(), true
}
return nil.false
}
Copy the code
AddTask() in addAndCheck() controls the maximum number of tasks. If this is exceeded, run RemoveAll() and pass the container’s tasks pop to commander, followed by a goroutine loop to read. Then go to the tasks.
backgroundFlush()
Open a backend coroutine and refresh the task in container:
func (pe *PeriodicalExecutor) backgroundFlush(a) {
Go func(){}
threading.GoSafe(func(a) {
ticker := pe.newTicker(pe.interval)
defer ticker.Stop()
var commanded bool
last := timex.Now()
for {
select {
// Get []tasks from channel
case vals := <-pe.commander:
commanded = true
// Essence: WG.add (1)
pe.enterExecution()
// Release the block of Add(), and the staging area is empty. A new task is added
pe.confirmChan <- lang.Placeholder
// Execute the actual task logic
pe.executeTasks(vals)
last = timex.Now()
case <-ticker.Chan():
if commanded {
// Because of the randomness of select, if both conditions are met at the same time, this process is reversed and the execution of this section is skipped
// https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
commanded = false
} else if pe.Flush() {
// The timer is reset when the refresh is complete. The staging area is empty. Start the next scheduled refresh
last = timex.Now()
} else if timex.Since(last) > pe.interval*idleRound {
// If maxTask, Flush() err is not reached and last->now is too long, Flush() will be triggered again.
// Only when this is reversed will a new backgroundFlush() coroutine be opened
pe.guarded = false
// Refresh again to prevent omissions
pe.Flush()
return}}}})}Copy the code
Overall two processes:
commander
To receiveRemoveAll()
Pass in the tasks, then do the execution, and releaseAdd()
To continueAdd()
ticker
When the time is up, if the first step is not executed, then automaticallyFlush()
, also do task execution
Wait()
In backgroundFlush(), mention a function: enterExecution() :
func (pe *PeriodicalExecutor) enterExecution(a) {
pe.wgBarrier.Guard(func(a) {
pe.waitGroup.Add(1)})}func (pe *PeriodicalExecutor) Wait(a) {
pe.wgBarrier.Guard(func(a) {
pe.waitGroup.Wait()
})
}
Copy the code
Dts.insertexecutor.wait () at the end of the list, waiting for all goroutine tasks to complete.
thinking
In looking at the source code, thinking about some other design ideas, do you have a similar problem:
- Based on the analysis of
executors
You’ll find it in a lot of placeslock
Go test has race. Use locking to avoid this
- Based on the analysis of
confirmChan
Found in thissubmitIt just came out. Why is that?
Previously: Wg.add (1) is written in executeTasks(); Now: first wg.add (1), then unblock confirmChan
If executor func blocks, the Add task is still running, and because it is not blocked, it may soon execute executor.Wait(), in which case wg.wait () will execute before Wg.add (), causing a panic
See the latest edition of TestPeriodicalExecutor_WaitFast(), which can be replicated if you run on it
conclusion
The remaining analysis of a few executors, left to you to see the source code.
In short, overall design:
- Follow interface oriented design
- Flexible use of
channel
,waitgroup
Equal-concurrency tools - Combination of execution unit and storage unit
There are also many useful component tools in Go-Zero, and good use of them can greatly improve service performance and development efficiency. I hope this article will bring some benefits to you.
The project address
If you think the article is good, please go to Github and click star 🤝.
Also welcome to go- Zero, github.com/tal-tech/go…