If I were to tell someone about Golang, the first thing THAT comes to mind is not necessarily Goroutine, but channel.
Because of Channel, Goroutine power is a powerful tool.
If I could explain what channel does in one sentence, I would say
Chanel is a conduit that allows data to flow.
So how do you understand the flow of data? ++
Let’s say you need to do two time-consuming operations on 100 requests and then count the weighted results, and you need as many concurrent operations as possible to improve performance. Example code is as follows:
var multipleChan = make(chan int, 4)
var minusChan = make(chan int, 4)
var harvestChan = make(chan int, 4)
defer close(multipleChan)
defer close(minusChan)
defer close(harvestChan)
go func() {
fori:=1; i<=100; i++{ multipleChan <- i } }()for i:=0; i<4; i++{
go func() {
for data := range multipleChan {
minusChan <- data * 2
time.Sleep(10* time.Millisecond)
}
}()
go func() {
for data := range minusChan {
harvestChan <- data - 1
time.Sleep(10* time.Millisecond)
}
}()
}
var sum = 0
var index = 0
for data := range harvestChan{
sum += data
index++
if index == 100{
break
}
}
fmt.Println(sum)
Copy the code
Don’t laugh at the simplicity of this code; it’s a bit complicated when you consider error handling. For example, a link is encountered errors can be ignored, a link is encountered to terminate all operations; Plus, sometimes you only care about the first return value that satisfies the condition, and you need timeout processing.
Maybe I can write it once, but if I have to write it in many places, it’s really a big head.
Repeating code is the root of all evil, and Don’t repeat yourself is the first step to becoming a good engineer.
Thus, the ChannelX library was born!
Using this library to do the same thing, the code looks like this
var sum = 0
NewChannelStream(func(seedChan chan<- Result, quitChannel chan struct{}) {
fori:=1; i<=100; I ++{seedChan < -result {Data: I}} close(seedChan).Pipe(func(Result Result) Result{return Result{Data: result.Data.(int) * 2}
}).Pipe(func(result Result) Result {
return Result{Data: result.Data.(int) - 1}
}).Harvest(func(result Result) {
sum += result.Data.(int)
})
fmt.Println(sum)
Copy the code
I like the chain style, so I wrote it like this, and you could have split it.
But the point is, does the code feel silky like writing a NodeJS stream
In addition to the Pipe->Harvest combination, you can also implement the Pipe->Race, Pipe->Drain, Pipe->Cancel and other combination operations.
These complex examples can be implemented by referring to unit tests in the stream_test.go file without Posting code.
So how is this stream implemented? The core lies in the NewChannelStream and Pipe functions.
func NewChannelStream(seedFunc SeedFunc, optionFuncs ... OptionFunc) *ChannelStream { cs := &ChannelStream{ workers: runtime.NumCPU(), optionFuncs: optionFuncs, }for _, of := range optionFuncs {
of(cs)
}
if cs.quitChan == nil {
cs.quitChan = make(chan struct{})
}
cs.dataChannel = make(chan Item, cs.workers)
go func() {
inputChan := make(chan Item)
go seedFunc(inputChan, cs.quitChan)
loop:
for {
select {
case <-cs.quitChan:
break loop
case res, ok := <-inputChan:
if! ok {break loop
}
select {
case <-cs.quitChan:
break loop
default:
}
ifres.Err ! = nil { cs.errors = append(cs.errors, res.Err) }if! cs.hasError && res.Err ! = nil { cs.hasError =true
cs.dataChannel <- res
if cs.ape == stop {
cs.Cancel()
}
continue
}
if cs.hasError && cs.ape == stop {
continue
}
cs.dataChannel <- res
}
}
safeCloseChannel(cs.dataChannel)
}()
returncs } func (p *ChannelStream) Pipe(dataPipeFunc PipeFunc, optionFuncs ... OptionFunc) *ChannelStream { seedFunc := func(dataPipeChannel chan<- Item, quitChannel chan struct{}) { wg := &sync.WaitGroup{} wg.Add(p.workers)for i := 0; i < p.workers; i++ {
go func() {
defer wg.Done()
loop:
for {
select {
case <-quitChannel:
break loop
case data, ok := <-p.dataChannel:
if! ok {break loop
}
select {
case <-quitChannel:
break loop
default:
}
dataPipeChannel <- dataPipeFunc(data)
}
}
}()
}
go func() { wg.Wait() safeCloseChannel(dataPipeChannel) }() } mergeOptionFuncs := make([]OptionFunc, len(p.optionFuncs)+len(optionFuncs)+1) copy(mergeOptionFuncs[0:len(p.optionFuncs)], p.optionFuncs) copy(mergeOptionFuncs[len(p.optionFuncs):], optionFuncs) mergeOptionFuncs[len(p.optionFuncs)+len(optionFuncs)] = passByQuitChan(p.quitChan) // This line ensures that there is a unique quitChan in the streamreturn NewChannelStream(seedFunc, mergeOptionFuncs...)
}
Copy the code
There’s a lot of code to look at, except for initialization, error handling, and exit handling, and the core is the flow of data through channels.
First, a new inputChan in NewChannelStream is passed to seedFunc, and then the data is passed to dataChannel via seedChan(inputChan).
Then, when Pipe is called, the Pipe function creates a seedFunc of its own from the dataChannel of the previous channelStream to the dataPipeChannel. The Pipe’s seedFunc is passed to NewChannelStream, creating a NewChannelStream object. In this case, inputChan is the dataPipeChannel in the Pipe. The entire data stream is strung together as follows:
inputChan(seedChan)->dataChannel->inputChan(dataPipeChannel)->dataChannel->….
MultipleChan and minusChan are two dataChannel corresponding to multipleChan and minusChan respectively. The extra two inputChan are the extra cost of using this library.
Original is not easy, your support is my biggest encouragement, welcome to Channelx point a star! 🙂
To continue, ChannelX will continue to add a variety of common scenarios channel implementation, please look forward to…