This is the 24th day of my participation in the August More Text Challenge
select
So let’s say I have two channels c1 and C2, and I want to take the values in both channels, and I want to take the values in whichever channel comes first. And that’s where select comes in
package main
import "fmt"
func main() {
var c1, c2 chan int // c1 and c2 is nil
select {
case n := <-c1:
fmt.Println("Receiver from c1: ", n)
case n := <- c2:
fmt.Println("Receiver from c2: ", n)
default:
fmt.Println("No value Received")
}
}
Copy the code
C1 and c2 are nil because they’re not initialized, but select works fine at this point, just doesn’t get the value, so we do the default part. As mentioned in the previous article, a channel sends or receives data blocking, but here it is like doing a non-blocking fetch
Select the use of
Here’s an example of how to use SELECT
package main
import (
"fmt"
"math/rand"
"time"
)
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
time.Sleep(
time.Duration(rand.Intn(1500)) *
time.Millisecond)
out <- i
i++
}
}()
return out
}
func main() {
var c1, c2 = generator(), generator()
for {
select {
case n := <-c1:
fmt.Println("Receiver from c1: ", n)
case n := <- c2:
fmt.Println("Receiver from c2: ", n)
}
}
}
Copy the code
Output result:
Receiver from c1: 0
Receiver from c2: 0
Receiver from c2: 1
Receiver from c1: 1
Receiver from c1: 2
Receiver from c2: 2
Receiver from c1: 3
Receiver from c2: 3
Receiver from c2: 4
Receiver from c2: 5
Receiver from c1: 4
Receiver from c1: 5
Receiver from c1: 6
Receiver from c2: 6
.......
Copy the code
The speed of the output data can be found that c1 and c2, who has the data, first select will obtain the data which channel first, if the two have at the same time, it will randomly choose a
Now make the following changes to the above code, adding a worker function that prints the values in the channel. Add a createWorker method to create a channel and open a Goroutine to print the values in the channel. With these two functions, instead of using Println to print a value in a select channel, create a channel with createWorker and assign the value from c1 or c2 to the created channel. The worker outputs the results. Details are as follows:
package main import ( "fmt" "math/rand" "time" ) func generator() chan int { out := make(chan int) go func() { i := 0 for { time.Sleep( time.Duration(rand.Intn(1500)) * time.Millisecond) out <- i i++ } }() return out } func worker(id int, c chan int) { for n := range c { fmt.Printf("worker %d, received %d\n", id, n) } } func createWorker(id int) chan<- int { c := make(chan int) go worker(id, c) return c } func main() { var c1, C2 = generator(), generator() w := createWorker(0) // Create a channel to receive values from c1 or c2 for {select {case n := <-c1: w <- n case n := <- c2: w <- n } } }Copy the code
Note: For a nil channel, if there is no default in the select, it will always block
A disadvantage of this approach is that after receiving a number (c1 or c2), the subsequent execution of w < -n is blocked (because channel send and receive are blocked). So, that’s not good, we can add another case in select, when we get from C1 or C2 to W. Therefore, we need to know if we got a value from c1 or c2, which is indicated by a variable hasValue
Make the following changes to the main function
func main() { var c1, c2 = generator(), Generator () var worker = createWorker(0) n := 0 hasValue := false// Indicates whether a value is obtained from C1 or c2. For {var activeWorker chan<- If hasValue {activeWorker = worker} select {case n = <-c1: hasValue = true case n = <- c2: hasValue = true case activeWorker <- n: hasValue = false } } }Copy the code
The above application does have a problem, but it is less likely to occur because the data in the activeWorker comes from C1 and C2, so the activeWorker may consume data at a different rate than c1 and C2 generate data. If the speed of data generation is too fast, for example, 1, 2 and 3 data are generated in one breath, and all of them are given to N, N will continuously collect, and the last N will be 3, so 1 and 2 cannot be output (you can mock this situation, so that the worker can print for a longer time and sleep for 5 seconds).
func worker(id int, c chan int) { for n := range c { time.Sleep(3 * time.Second) fmt.Printf("worker %d, received %d\n", Id, n)}} Output: worker 0, received 0 worker 0, received 5 worker 0, received 9 worker 0, received 11...... You can see that some of the data will jump off and not be outputCopy the code
So our solution is to queue up all the n’s, and the channels responsible for consumption processing can process them one by one. So the modified code looks like this:
func main() { var c1, c2 = generator(), Generator () var worker = createWorker(0) var values []int for {var activeWorker chan< -int // Because worker is a channel for sending data Var activeValue int if len(values) > 0 { ActiveWorker = worker activeValue = values[0]} select {case n := <-c1: values = append(values, n) case n := <- c2: values = append(values, n) case activeWorker <- activeValue: Values = values[1:]}}} Output:...... worker 0, received 2 worker 0, received 2 worker 0, received 3 worker 0, received 3 worker 0, received 4 worker 0, received 5 worker 0, received 4 worker 0, received 5 worker 0, received 6 worker 0, received 6 ......Copy the code
At this point, I found that even if the worker prints slowly, there will be no data loss (I omitted the before and after parts of the print results to show that the print results would be out of order).
Use of timer
There must be a lot of data stored in values, and we can look at how much data is stored in values. Today’s programs print data forever and never end. After(10 * time.second), which returns a channel.After 10 seconds, it will send a time to this channel, so that it can receive it in select. Exit after receiving, modify as follows:
func main() { var c1, c2 = generator(), Generator () var worker = createWorker(0) var values []int tm := time.after (10 * time.second) // This method returns a channel. Which means he's gonna be, For {var activeWorker chan< -int var activeValue int if len(values) > 0 { ActiveWorker = worker activeValue = values[0]} select {case n := <-c1: values = append(values, n) case n := <- c2: values = append(values, n) case activeWorker <- activeValue: values = values[1:] case <- tm: fmt.Println("Bye") return } } }Copy the code
We can also use the time.after method above, assuming that we consider a timeout if the data exceeds 800ms and has not been generated
func main() { var c1, c2 = generator(), Generator () var worker = createWorker(0) var values []int tm := time.after (10 * time.second) // This method returns a channel. Which means he's gonna be, For {var activeWorker chan< -int var activeValue int if len(values) > 0 { ActiveWorker = worker activeValue = values[0]} select {case n := <-c1: values = append(values, n) case n := <- c2: values = append(values, n) case activeWorker <- activeValue: values = values[1:] case <-tm: // FMT.Println("Bye") return case < time.after (800* time.millisecond):// Println("timeout")}}}Copy the code
If you’re worried about a backlog in Slice, you can add a timing feature that looks at the backlog every second. We can use the time.Tick method, which also returns a channel and sends data to the queue at specified intervals, which we can also receive in case
func main() { var c1, c2 = generator(), Generator () var worker = createWorker(0) var values []int tm := time.after (10 * time.second) // This method returns a channel. Which means he's gonna be, Tick := time.tick (time.second)// Send one data per Second to the returned channel for {var activeWorker chan<- Var activeValue int if len(values) > 0 {// As long as there are values in slice, ActiveWorker = worker activeValue = values[0]} select {case n := <-c1: values = append(values, n) case n := <- c2: values = append(values, n) case activeWorker <- activeValue: values = values[1:] case <-tm: // FMT.Println("Bye") return case < time.after (800* time.millisecond):// Println("timeout") case <-tick: Println("queue len: ", len(values))}}}Copy the code