How do I implement a simple ForkJoin framework with Golang

Throw in my project address

go-fork-join

The simple principle

  • What is a ForkJoin

    ForkJoin came into contact with the ForkJoin framework because we were learning about parallel streams in Java called Stream

    ForkJoin is more suitable for modern machines with multiple CPU cores. The ForkJoin framework can be used to break a large task into several smaller tasks that do not depend on each other. Using a dial-and-conquer strategy, the ForkJoin framework can be used to distribute smaller tasks to the CPU core

    A lot of specific blogs are very good, here is not detailed, to a few I was learning the blog address

    • Juejin. Cn/post / 684490…
  • The task of stealing

    In fact, the task stealing algorithm means that workers can obtain elements from the head of their own work queue or the tail of other workers’ work queue.

    Each time when polling the task queue, the task is first obtained from the task queue corresponding to each Worker. If there is no pending task in the task queue, the random selection strategy is adopted at this time to randomly select a work queue corresponding to Worker to steal its task

  • Join result of subtask

    In Java, it is necessary to continuously obtain the execution of the task. If the task is executed, the result of the task processing is returned. In Golang, due to the existence of Chan, Java’s Future mode is very easy to implement. It only needs to read the channel when the task joins. Because when chan’s cap is set to 1, if there is no data in the channel, the reader will be blocked and wait

func (f *ForkJoinTask) Join(a) (bool.interface{}) {
	for {
		select {
		case data, ok := <-f.result:
			if ok {
				return true, data
			}
		case <-f.ctx.Done():
			panic(f.taskPool.err)
		}
	}
}
Copy the code

The core code

Task queue

The task queue is traversed. There is more than one task queue, but there are multiple task queues, each time will get a task from these task queues, if the task exists, the task is packaged into a structure; After obtaining the task, the worker of a task is acquired, and then the packaged task is sent to the chan channel of the worker to send the task asynchronously

func (fp *ForkJoinPool) run(ctx context.Context) {
	go func(a) {
		wId := int32(0)
		for {
			select {
			case <-ctx.Done():
				fmt.Printf("here is err")
				fp.err = fp.wp.err
				return
			default:
				hasTask, job, ft := fp.taskQueue.dequeueByTali(wId)
				if hasTask {
					fp.wp.Submit(ctx, &struct {
						T Task
						F *ForkJoinTask
						C context.Context
					}{T: job, F: ft, C: ctx})
				}
				wId = (wId + 1) % fp.cap(1)}}}}Copy the code

Get a Worker

During ForkJoin initialization, the Worker pool is initialized based on the number of CPU cores

func newPool(ctx context.Context, cancel context.CancelFunc) *Pool{... wCnt := runtime.NumCPU()for i := 0; i < wCnt; i ++ {
		w := newWorker(p)
		w.run(ctx)
		p.workers = append(p.workers, w)
	}
	...
}
Copy the code

Subsequently, processing tasks must be performed by a corresponding worker. Therefore, every time a worker is obtained, it will first go to the worker pool to judge whether there are idle workers. If there are idle workers, it will directly obtain a worker; otherwise, it will directly create a worker to accept tasks

func (p *Pool) retrieveWorker(ctx context.Context) *Worker {

	var w *Worker

	idleWorker := p.workers

	if len(idleWorker) >= 1 {
		p.lock.Lock()
		n := len(idleWorker) - 1
		w = idleWorker[n]
		p.workers = idleWorker[:n]
		p.lock.Unlock()
	} else {
		ifcacheWorker := p.workerCache.Get(); cacheWorker ! =nil {
			w = cacheWorker.(*Worker)
		} else {
			w = &Worker{
				pool: p,
				job: make(chan *struct {
					T Task
					F *ForkJoinTask
					C context.Context
				}, 1),
			}
		}
		w.run(ctx)
	}
	return w
}
Copy the code

Worker

For objects that actually perform tasks, each worker is bound with a Goruntine, and there is a Chan channel for asynchronously receiving tasks and asynchronously taking out and executing tasks in Goruntine; When the task is finished, return the worker to the worker pool

func (w *Worker) run(ctx context.Context) {
	go func(a) {

		var tmpTask *ForkJoinTask

		defer func(a) {
			if p := recover(a); p ! =nil {
				w.pool.panicHandler(p)
				iftmpTask ! =nil {
					w.pool.err = p
					close(tmpTask.result)
				}
			}
		}()

		for {
			select {
			case <-ctx.Done():
				fmt.Println("An exception occurred and the task has stopped")
				return
			default:
				for job := range w.job {
					if job == nil {
						w.pool.workerCache.Put(w)
						return
					}
					tmpTask = job.F
					job.F.result <- job.T.Compute()
					w.pool.releaseWorker(w)
				}
			}
		}
	}()
}
Copy the code

results

Areas that are being improved

  • Task theft algorithm

    The current v0.1 task stealing algorithm does not allow two workers to take a task from a queue, like Java’s ForkJoin. Instead, it locks the entire queue when it takes a task. Therefore, concurrency performance is not good. Two workers can read the data in a queue at the same time. If two workers acquire elements from a task queue of only 1 length at the same time, the optimistic lock is upgraded to pessimistic lock for control

  • Worker quantity control

    The current number of workers is created as tasks are broken down, and breaking down tasks too deeply could result in the creation of a large number of workers, so continue to understand ForkJoin’s scheduling of thread resources