preface

Many companies are building a golang language stack. Have you ever wondered why this is happening? Because go is more suitable for middleware, another reason is that the go concurrency support is better, that is our usual so-called high concurrency, concurrency support cannot leave coroutines, coroutines, of course, is also not disorderly use, need to management, management of coroutines way association fibers so association in fibers is not so mysterious, Today we are going to unveil the coroutine pool step by step. If you have not touched the coroutine of GO, it doesn’t matter. I will try to write in detail.

Goroutine (coroutines)

Let’s start with a simple example

func go_worker(name string) { for i := 0; i < 5; I++ {FMT.Println(" my name is ", name) time.sleep (1 * time.second)} FMT.Println(name, }func main() {go_worker("123") go_worker("456") for I := 0; i < 5; Println(" I am main") time.sleep (1 * time.second)}}Copy the code

When we execute this code, of course, we do it sequentially

Go_worker (“123”)->go_worker(“456”)-> I execute main

The output is as follows

My name is 123 my name is 123 My name is 123 my name is 123123 All done. My name is 456 my name is 456 I'm main I'm main I'm main I'm main I'm mainCopy the code

If a task is slow, the efficiency of the whole program can be imagined, but in GO language, support coroutines, so we can change the above code

func go_worker(name string) { for i := 0; i < 5; I++ {FMT.Println(" my name is ", name) time.sleep (1 * time.second)} FMT.Println(name, }func main() {go go_worker("123") // go go_worker("456") // for I := 0; i < 5; Println(" I am main") time.sleep (1 * time.second)}}Copy the code

We add a go in front of different go_workers, so that all tasks are serialized asynchronously, and the output result is as follows

My name is main and my name is 456. My name is 123. My name is main and my name is 456Copy the code

As you can see, each task does its own thing without affecting the other, and efficiency is greatly improved. That’s the Goroutine

Channel

Now with coroutines comes a new question, how do coroutines communicate with each other? This leads to the idea of a pipe, which is very simple, just putting data in and taking data out

Func worker(c chan int) {num := <-c worker(c chan int) {FMT.Println(" c:", num)}func main() { Int c := make(chan int) // make(chan int) // go worker(c) c < -2 //Copy the code

We can see the above example. In the main function, we define a pipe of type int and write a 2 into it. Then we read pipe C in the worker and get 2

Problems caused by coroutines

Since it’s so easy to start coroutines in Golang, is there a pit in it?

We can see above, the real business, different business on different goroutine to perform, but at the micro level, the CPU is one of the serial command a command to execute, just perform very fast, if instructions for too much, CPU switch also can become more, just need to consume in the process of switching performance, So the main function of a coroutine pool is to manage goroutines and limit the number of goroutines

Implementation of coroutine pool

  • First, different tasks come in and write directly to entryChannel, which then communicates with jobsChannel

  • Then we start three coroutines (not necessarily three, just three for example) to read data from jobsChannel for task processing.

  • JobsChannel and entryChannel are designed for decoupling. EntryChannel can be completely used as an entry point. JobsChannel can do more in-depth things like task priority or locking. Process such as unlocking

Code implementation

Now that the principle is clear, let’s look at the code implementation

First, let’s deal with tasks, tasks are nothing more than various tasks in the business, need to be able to strength, and execute, the code is as follows

Struct {f func() error // A Task must contain a specific business}// Create a Taskfunc with NewTask NewTask(arg_f func() error) *Task{ t := Task{ f:arg_f, Func (t *Task) Execute(){t.f()// call the business method already bound in the Task}Copy the code

Now let’s define the coroutine pool

Type Pool struct{EntryChannel chan *Task WorkerNum int JobsChanel chan *Task} func NewPool(cap int) *Pool{ p := Pool{ EntryChannel: make(chan *Task), JobsChanel: make(chan *Task), WorkerNum: cap, } return &p}Copy the code

The coroutine pool needs to create workers and then continuously take tasks from JobsChannel internal task queue to start working

Func (p *Pool) worker(workerId int){// Worker takes tasks from JobsChannel for task := range p.joboschanel { Task.execute () fmt.println ("workerId",workerId," Task executed successfully ")}} EntryChannel Obtain task ReceiveTask(t * task){task.execute () fmt.println ("workerId",workerId," task executed successfully ")}} Func (p *Pool) Run(){// start a fixed number of workers for I :=0; // start a fixed number of workers for I :=0; i<p.WorkerNum; I ++{go p.werker (I)} //2: fetch tasks from EntryChannel coroutine exit and send tasks to JobsChannel for task := range p.entrychannel { P.jobschanel < -task} //3: JobsChannel and EntryChannel close(p.jobschanel) close(p.entrychannel)}Copy the code

And then we look at the main function

T := NewTask(func() error{fmt.println (time.now ()) return nil}) Worker p:= NewPool(3) // start a coroutine, Go func(){for {p.reiveTask (t)// push the task to EntryChannel}}() // start the coroutine Pool p.runn ()Copy the code

Based on the above method, we design a simple association in fibers is done, of course in the actual production conditions isn’t enough to do so, but these methods whiz, that is quite familiar to golang, if you need to get the complete code, welcome to my public number “programmer a little rice,” reply “association fibers can get”.