preface
- Simplify concurrent development
- The following code content is from Big Bin
1. Simple example
- Convert the data in the array to their square
- The above process is broken down into three steps
- Production information
producer()
, traversing slices - Processing information
square()
, calculate the square - Consumer information
main()
Consumption,
- Production information
1. Production information
func producer(nums ...int) <-chan int {
// Create a buffer channel
out := make(chan int.10)
// Store data in the channel via coroutines
go func(a){
defer close(out) // Close the channel last
for _,num := range nums {
out <- num
}
}()
return out
}
Copy the code
2. Process information
func square(inCh <-chan int) <-chan int {
out := make(chan int.10)
go func(a){
defer cloes(out)
for n := range inCh {
out <- n*n
}
}()
return out
}
Copy the code
3. Consume information
func main(a) {
// Split the data into the channel first
in := producer(1.2.3.4)
// Process the data
ch := square(in)
// Consumption data
for ret := range ch {
fmt.Printf("%3d",ret)
}
}
Copy the code
FAN-IN and Fan-out models are optimized
- Fan-out: Multiple Goruntine reads data from the same channel until it is closed
- Fan-in: 1 Goruntine reads data from multiple channels until that channel is closed
1. Fan-out and Fan-in practices
1. The producersproducer()
And message processingsquare()
The same
func producer(nums ...int) <-chan int {
// Create a buffer channel
out := make(chan int.10)
// Store data in the channel via coroutines
go func(a){
defer close(out) // Close the channel last
for _,num := range nums {
out <- num
}
}()
return out
}
func square(inCh <-chan int) <-chan int {
out := make(chan int.10)
go func(a){
defer cloes(out)
for n := range inCh {
out <- n*n
}
}()
return out
}
Copy the code
2. Newmerge()
Used for multiplesquare()
The operation finally returns to a channel consumption read – fan-in
func merge(cs ... <-chan int) <-chan int {
out := make(chan int.10)
// Create a timer
var wg sync.WaitGroup
// Return all data to one channel
// The channel is operable
collect := func (in chan int){
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// FAN - IN
for _,c := range cs {
go collect(c)
}
// Error mode: direct wait is a bug, deadlock, because merge wrote out but main did not read
// wg.Wait()
// close(out)
go func(a){
wg.Wait()
close(out)
}()
return out
}
Copy the code
3. Modifymain()
, start threesquare()
, a producerproducer()
By multiplesquare()
Read the FAN – OUT
func main(a) {
in := producer(1.2.3.4)
// fan-out The coroutine is turned on
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumer
for ret := merge(c1,c2,c3) {
fmt.Printf("%3d",ret)
}
}
Copy the code
3. Optimize the FAN mode
- Different scenarios are optimized, according to the specific situation, to solve the bottleneck of the program
- In general, however, unbuffered channels are not recommended, but buffered channels are recommended
conclusion
- This is a learning blog. I recommend reading the original article
- Thanks for watching