Go language can use flow model to improve concurrency and make full use of computer performance. This article mainly introduces the construction of pipeline model, and how to exit gracefully in concurrent situation.
What is the pipeline model
The pipelined model consists of several stages, each of which can be composed of several goroutines with the same function. Each stage of goroutine has the following characteristics:
- Receives data from the upstream channle
- Processing data usually produces new data
- Data processed through a channel is sent downstream
Each phase uses a send channel and a receive channel except for the start phase, which is “input only”, and the end phase, which is “output only”.
Use streamline to square
The pipeline is introduced by an example of calculating the square of each element in a slice. At the beginning, a new Goroutine is created to output sliced data to a channel
func producer(input ... Defer close(out) for _, please defer close(out) for _, k := range input { out <- k } }() return out }Copy the code
In the processing phase, data is read from the channel, platform is found for each data, and output to the channel
Func process(input <-chan int64) <-chan int64 {out := make(chan int64) go func() {defer close(out) For v := range input {out < -v * v}}() return out}Copy the code
The end phase will output the channel
Func end(input <-chan int64) <-chan int64 {out := make(chan int64) go func() {defer close(out) For v := range input {out < -v}}() return out}Copy the code
Call using mian function
func main() {
p := producer(1, 2, 3, 4, 5, 6)
proc := process(p)
e := end(proc)
fmt.Println("goroutines: ", runtime.NumGoroutine())
for v := range e {
fmt.Println(v)
}
time.Sleep(time.Second)
fmt.Println("goroutines: ", runtime.NumGoroutine())
}
Copy the code
Fan In fan-in and Fan Out
In the example in the previous section, each stage created a Goroutine to receive, process, and send data. In a production environment, multiple Goroutines can fan out data from the same channel for consumption processing, and a Goroutine is required to collectively collect multiple channels into a channel for output fan in.
In the example in the previous section, we could run the process function multiple times to process the data concurrently. Also, merge is a new function used to receive channels from multiple Goroutine outputs
// Inputs merge(inputs... <-chan int64) <-chan int64 { out := make(chan int64) var wg sync.WaitGroup wg.Add(len(inputs)) for _, Input := range inputs {inputs: Wg.done () for v := range input {out < -v}}(input)} go func() {// Out wg.wait () close(out)}() return out} func main() {p := producer(1, 2, 3, 4, 5, 6) // Fan out, Proc1 := process(p) proc2 := process(p) // Fan in, merge(proc1, proc2) fmt.Println("goroutines: ", runtime.NumGoroutine()) for v := range res { fmt.Println(v) } time.Sleep(time.Second) fmt.Println("goroutines: ", runtime.NumGoroutine()) }Copy the code
Off line
In the example in the previous section, if the main function did not consume all res, all channels in the pipeline could not be closed, resulting in all goroutines being unable to exit, resulting in resource leaks.
func main() { p := producer(1, 2, 3, 4, 5, 6) proc1 := process(p) proc2 := process(p) res := merge(proc1, proc2) fmt.Println("goroutines: ", runtime.numgoroutine ()) fmt.Println(<-res) time.sleep (time.second) // fmt.Println("goroutines: ", runtime.NumGoroutine()) }Copy the code
When an exception occurs or upstream data is no longer needed, a channel is used to notify the Goroutine at each stage, close the channel, exit the Goroutine, and release system resources. Create a new channel in the main function to send signals to the goroutine at different stages
func producer(done <-chan struct{}, input ... int64) <-chan int64 { out := make(chan int64) go func() { defer close(out) for _, k := range input { select { case out <- k: case <-done: // exit func process(done <-chan struct{}, Input <-chan int64) <-chan int64 {out := make(chan int64) go func() {defer close(out) // For v: = range input {select {case out < -v * v: case <-done: return } } }() return out } func merge(done <-chan struct{}, inputs ... <-chan int64) <-chan int64 { out := make(chan int64) var wg sync.WaitGroup wg.Add(len(inputs)) for _, Input := range inputs {inputs: Defer wg.done () for v: = range input {select {case out < -v: case <-done: defer wg.done () for v: = range input {select {case out < -v: case <-done: Out wg.wait () close(out)}() return out} func main() {done := make(chan struct{}) //defer close(done) p := producer(done, 1, 2, 3, 4, 5, 6) proc1 := process(done, p) proc2 := process(done, p) res := merge(done, proc1, proc2) fmt.Println("goroutines: ", runtime.numgoroutine ()) FMT.Println(<-res) close(done) time.sleep (time.second) FMT.Println("goroutines: ") ", runtime.NumGoroutine()) }Copy the code
conclusion
We can think of the pipeline model as a set of concurrent patterns, in which a task is broken down into different sub-tasks for “concurrent processing”, and finally the results of the sub-tasks are gathered together for output. But in Go, we can quickly create goroutines and use channels to communicate between goroutines. If you use other programming languages, you might need to implement this pattern using global variables, read and write locks.
Welcome to add the wechat public account “Luo Xiaoyan” for communication and learning