How do I implement a simple ForkJoin framework with Golang
Throw in my project address
go-fork-join
The simple principle
-
What is a ForkJoin
ForkJoin came into contact with the ForkJoin framework because we were learning about parallel streams in Java called Stream
ForkJoin is more suitable for modern machines with multiple CPU cores. The ForkJoin framework can be used to break a large task into several smaller tasks that do not depend on each other. Using a dial-and-conquer strategy, the ForkJoin framework can be used to distribute smaller tasks to the CPU core
A lot of specific blogs are very good, here is not detailed, to a few I was learning the blog address
- Juejin. Cn/post / 684490…
-
The task of stealing
In fact, the task stealing algorithm means that workers can obtain elements from the head of their own work queue or the tail of other workers’ work queue.
Each time when polling the task queue, the task is first obtained from the task queue corresponding to each Worker. If there is no pending task in the task queue, the random selection strategy is adopted at this time to randomly select a work queue corresponding to Worker to steal its task
-
Join result of subtask
In Java, it is necessary to continuously obtain the execution of the task. If the task is executed, the result of the task processing is returned. In Golang, due to the existence of Chan, Java’s Future mode is very easy to implement. It only needs to read the channel when the task joins. Because when chan’s cap is set to 1, if there is no data in the channel, the reader will be blocked and wait
func (f *ForkJoinTask) Join(a) (bool.interface{}) {
for {
select {
case data, ok := <-f.result:
if ok {
return true, data
}
case <-f.ctx.Done():
panic(f.taskPool.err)
}
}
}
Copy the code
The core code
Task queue
The task queue is traversed. There is more than one task queue, but there are multiple task queues, each time will get a task from these task queues, if the task exists, the task is packaged into a structure; After obtaining the task, the worker of a task is acquired, and then the packaged task is sent to the chan channel of the worker to send the task asynchronously
func (fp *ForkJoinPool) run(ctx context.Context) {
go func(a) {
wId := int32(0)
for {
select {
case <-ctx.Done():
fmt.Printf("here is err")
fp.err = fp.wp.err
return
default:
hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
if hasTask {
fp.wp.Submit(ctx, &struct {
T Task
F *ForkJoinTask
C context.Context
}{T: job, F: ft, C: ctx})
}
wId = (wId + 1) % fp.cap(1)}}}}Copy the code
Get a Worker
During ForkJoin initialization, the Worker pool is initialized based on the number of CPU cores
func newPool(ctx context.Context, cancel context.CancelFunc) *Pool{... wCnt := runtime.NumCPU()for i := 0; i < wCnt; i ++ {
w := newWorker(p)
w.run(ctx)
p.workers = append(p.workers, w)
}
...
}
Copy the code
Subsequently, processing tasks must be performed by a corresponding worker. Therefore, every time a worker is obtained, it will first go to the worker pool to judge whether there are idle workers. If there are idle workers, it will directly obtain a worker; otherwise, it will directly create a worker to accept tasks
func (p *Pool) retrieveWorker(ctx context.Context) *Worker {
var w *Worker
idleWorker := p.workers
if len(idleWorker) >= 1 {
p.lock.Lock()
n := len(idleWorker) - 1
w = idleWorker[n]
p.workers = idleWorker[:n]
p.lock.Unlock()
} else {
ifcacheWorker := p.workerCache.Get(); cacheWorker ! =nil {
w = cacheWorker.(*Worker)
} else {
w = &Worker{
pool: p,
job: make(chan *struct {
T Task
F *ForkJoinTask
C context.Context
}, 1),
}
}
w.run(ctx)
}
return w
}
Copy the code
Worker
For objects that actually perform tasks, each worker is bound with a Goruntine, and there is a Chan channel for asynchronously receiving tasks and asynchronously taking out and executing tasks in Goruntine; When the task is finished, return the worker to the worker pool
func (w *Worker) run(ctx context.Context) {
go func(a) {
var tmpTask *ForkJoinTask
defer func(a) {
if p := recover(a); p ! =nil {
w.pool.panicHandler(p)
iftmpTask ! =nil {
w.pool.err = p
close(tmpTask.result)
}
}
}()
for {
select {
case <-ctx.Done():
fmt.Println("An exception occurred and the task has stopped")
return
default:
for job := range w.job {
if job == nil {
w.pool.workerCache.Put(w)
return
}
tmpTask = job.F
job.F.result <- job.T.Compute()
w.pool.releaseWorker(w)
}
}
}
}()
}
Copy the code
results
Areas that are being improved
-
Task theft algorithm
The current v0.1 task stealing algorithm does not allow two workers to take a task from a queue, like Java’s ForkJoin. Instead, it locks the entire queue when it takes a task. Therefore, concurrency performance is not good. Two workers can read the data in a queue at the same time. If two workers acquire elements from a task queue of only 1 length at the same time, the optimistic lock is upgraded to pessimistic lock for control
-
Worker quantity control
The current number of workers is created as tasks are broken down, and breaking down tasks too deeply could result in the creation of a large number of workers, so continue to understand ForkJoin’s scheduling of thread resources