Go language can use flow model to improve concurrency and make full use of computer performance. This article mainly introduces the construction of pipeline model, and how to exit gracefully in concurrent situation.

What is the pipeline model

The pipelined model consists of several stages, each of which can be composed of several goroutines with the same function. Each stage of goroutine has the following characteristics:

  • Receives data from the upstream channle
  • Processing data usually produces new data
  • Data processed through a channel is sent downstream

Each phase uses a send channel and a receive channel except for the start phase, which is “input only”, and the end phase, which is “output only”.

Use streamline to square

The pipeline is introduced by an example of calculating the square of each element in a slice. At the beginning, a new Goroutine is created to output sliced data to a channel

func producer(input ... Defer close(out) for _, please defer close(out) for _, k := range input { out <- k } }() return out }Copy the code

In the processing phase, data is read from the channel, platform is found for each data, and output to the channel

Func process(input <-chan int64) <-chan int64 {out := make(chan int64) go func() {defer close(out) For v := range input {out < -v * v}}() return out}Copy the code

The end phase will output the channel

Func end(input <-chan int64) <-chan int64 {out := make(chan int64) go func() {defer close(out) For v := range input {out < -v}}() return out}Copy the code

Call using mian function

func main() {
   p := producer(1, 2, 3, 4, 5, 6)
   proc := process(p)
   e := end(proc)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
   for v := range e {
      fmt.Println(v)
   }
   time.Sleep(time.Second)
   fmt.Println("goroutines: ", runtime.NumGoroutine())
}
Copy the code

Fan In fan-in and Fan Out

In the example in the previous section, each stage created a Goroutine to receive, process, and send data. In a production environment, multiple Goroutines can fan out data from the same channel for consumption processing, and a Goroutine is required to collectively collect multiple channels into a channel for output fan in.

In the example in the previous section, we could run the process function multiple times to process the data concurrently. Also, merge is a new function used to receive channels from multiple Goroutine outputs

// Inputs merge(inputs... <-chan int64) <-chan int64 { out := make(chan int64) var wg sync.WaitGroup wg.Add(len(inputs)) for _, Input := range inputs {inputs: Wg.done () for v := range input {out < -v}}(input)} go func() {// Out wg.wait () close(out)}() return out} func main() {p := producer(1, 2, 3, 4, 5, 6) // Fan out, Proc1 := process(p) proc2 := process(p) // Fan in, merge(proc1, proc2) fmt.Println("goroutines: ", runtime.NumGoroutine()) for v := range res { fmt.Println(v) } time.Sleep(time.Second) fmt.Println("goroutines: ", runtime.NumGoroutine()) }Copy the code

Off line

In the example in the previous section, if the main function did not consume all res, all channels in the pipeline could not be closed, resulting in all goroutines being unable to exit, resulting in resource leaks.

func main() { p := producer(1, 2, 3, 4, 5, 6) proc1 := process(p) proc2 := process(p) res := merge(proc1, proc2) fmt.Println("goroutines: ", runtime.numgoroutine ()) fmt.Println(<-res) time.sleep (time.second) // fmt.Println("goroutines: ", runtime.NumGoroutine()) }Copy the code

When an exception occurs or upstream data is no longer needed, a channel is used to notify the Goroutine at each stage, close the channel, exit the Goroutine, and release system resources. Create a new channel in the main function to send signals to the goroutine at different stages

func producer(done <-chan struct{}, input ... int64) <-chan int64 { out := make(chan int64) go func() { defer close(out) for _, k := range input { select { case out <- k: case <-done: // exit func process(done <-chan struct{}, Input <-chan int64) <-chan int64 {out := make(chan int64) go func() {defer close(out) // For v: = range input {select {case out < -v * v: case <-done: return } } }() return out } func merge(done <-chan struct{}, inputs ... <-chan int64) <-chan int64 { out := make(chan int64) var wg sync.WaitGroup wg.Add(len(inputs)) for _, Input := range inputs {inputs: Defer wg.done () for v: = range input {select {case out < -v: case <-done: defer wg.done () for v: = range input {select {case out < -v: case <-done: Out wg.wait () close(out)}() return out} func main() {done := make(chan struct{}) //defer close(done) p := producer(done, 1, 2, 3, 4, 5, 6) proc1 := process(done, p) proc2 := process(done, p) res := merge(done, proc1, proc2) fmt.Println("goroutines: ", runtime.numgoroutine ()) FMT.Println(<-res) close(done) time.sleep (time.second) FMT.Println("goroutines: ") ", runtime.NumGoroutine()) }Copy the code

conclusion

We can think of the pipeline model as a set of concurrent patterns, in which a task is broken down into different sub-tasks for “concurrent processing”, and finally the results of the sub-tasks are gathered together for output. But in Go, we can quickly create goroutines and use channels to communicate between goroutines. If you use other programming languages, you might need to implement this pattern using global variables, read and write locks.

Welcome to add the wechat public account “Luo Xiaoyan” for communication and learning