A few days ago, I wrote an article explaining the Go channel specification. That post got a lot of thumbs up on Reddit and HN, but there were some criticisms of the design details of the Go channel.
I’ve collected a few criticisms of the Go channel’s design and rules:
- There is no simple universal way to check if a channel is closed without modifying its state.
- Closing closed channels can cause outages, so it is dangerous to close a channel if the closers do not know whether the channel is closed or not.
- Sending values to a closed channel can cause downtime, so it is dangerous to send values to a channel if the sender does not know whether the channel is closed or not.
These criticisms seem justified (they are not). Yes, there is actually no built-in function to check if the channel is closed.
If you can be sure that no value is (or will be) sent to the channel, then there is indeed an easy way to check if the channel is closed. This approach was presented in the previous article. Here, for better consistency, the following example lists the method again.
package main
import "fmt"
type T int
func IsClosed(ch <-chan T) bool {
select {
case <-ch:
return true
default:}return false
}
func main(a) {
c := make(chan T)
fmt.Println(IsClosed(c)) // false
close(c)
fmt.Println(IsClosed(c)) // true
}
Copy the code
As mentioned above, this is not a generic way to check if a channel is closed.
In fact, even if there were a simple built-in closed function to check if the channel was closed, it would be of limited use, just as the built-in Len function checks the number of current values stored in the channel’s value buffer. The reason is that the state of the checked channel may have changed after the return from calling such a function, so that the returned value does not reflect the most recent state of the channel you just checked. Although it is ok to stop sending values to channel CH if the call to closed(ch) returns true, it is not safe to close the channel or continue sending values to the channel if the call to closed(ch) returns false.
Channel closing principle
A general rule for using Go channels is not to close the channel at the receiver, or if the channel has multiple concurrent senders. In other words, if the sender is the only sender of the channel, we should only close the channel in the sender’s Goroutine.
(Below, we refer to the above principle as the channel closing principle.)
Of course, this is not a universal principle of closed channels. The general rule is not to close (or send values to) closed channels. If we can guarantee that no more Goroutine closes and sends a value to a non-closed non-nil channel, then goroutine can safely close the channel. However, making such a guarantee by a receiver of a channel or one of multiple senders usually requires a great deal of effort and often complicates the code. On the contrary, it is much easier to master the channel closing principle described above.
Example of a crude channel closure
If you want to close a channel on the self-receiver side or close a channel among multiple senders, you can use a recovery mechanism to prevent program crashes. Here is an example (assuming channel element type T).
func SafeClose(ch chan T) (justClosed bool) {
defer func(a) {
if recover() != nil {
// The return result can be altered
// in a defer function call.
justClosed = false
}
}()
// assume ch ! = nil here.
close(ch) // panic if ch is closed
return true // <=> justClosed = true; return
}
Copy the code
This solution clearly breaks the channel closing principle.
The same idea can be used to send values to potentially closed channels.
func SafeSend(ch chan T, value T) (closed bool) {
defer func(a) {
if recover() != nil {
closed = true
}
}()
ch <- value // panic if ch is closed
return false // <=> closed = false; return
}
Copy the code
Not only does a crude solution break the channel closing principle, but data competition can also occur in the process.
Example of politely closing a channel
Many people like to use sync.once to close the channel:
type MyChannel struct {
C chan T
once sync.Once
}
func NewMyChannel(a) *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose(a) {
mc.once.Do(func(a) {
close(mc.C)
})
}
Copy the code
Of course, we can also use sync.Mutex to avoid multiple channel closures:
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel(a) *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose(a) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
if! mc.closed {close(mc.C)
mc.closed = true}}func (mc *MyChannel) IsClosed(a) bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
Copy the code
These approaches may be polite, but they may not avoid data competition. Currently, the Go specification does not guarantee that data contention will not occur when channel closure and channel send operations are performed simultaneously. If the SafeClose function is called at the same time as a channel send operation on the same channel, data contention can occur (although such data contention usually does no harm).
An example of gracefully closing channels
One drawback of the SafeSend function described above is that its call cannot be used as a SEND operation following a case keyword in a SELECT block.
Another drawback of the SafeSend and SafeClose functions mentioned above is that many people, myself included, would consider using panic/ Recover and sync packages inappropriate. Next, I’ll look at some channel solutions that don’t use panic/ Recover and sync packages, for a variety of situations.
In the following example, sync.waitgroup is used to complete the example. It may not be important to use it in practice.)
- M receivers, 1 sender, and the sender says “no more” by closing the data channel.
This is the simplest case, simply letting the sender close the data channel when he doesn’t want to send any more data.
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main(a) {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
// the sender
go func(a) {
for {
if value := rand.Intn(Max); value == 0 {
// The only sender can close the
// channel at any time safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func(a) {
defer wgReceivers.Done()
// Receive values until dataCh is
// closed and the value buffer queue
// of dataCh becomes empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
Copy the code
- One receiver, N senders, the only receiver says “Please stop sending more” by closing one extra signal channel.
This situation is a little more complicated than the above. We cannot stop the data transfer by asking the receiver to close the data channel, because doing so would violate the channel closure principle. But we can tell the receiver to close an extra signal channel, telling the sender to stop sending values.
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main(a) {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel
// dataCh, and its receivers are the
// senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func(a) {
for {
// The try-receive operation is to try
// to exit the goroutine as early as
// possible. For this specified example,
// it is not essential.
select {
case <- stopCh:
return
default:}// Even if stopCh is closed, the first
// branch in the second select may be
// still not selected for some loops if
// the send to dataCh is also unblocked.
// But this is acceptable for this
// example, so the first select block
// above can be omitted.
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// the receiver
go func(a) {
defer wgReceivers.Done()
for value := range dataCh {
if value == Max- 1 {
// The receiver of channel dataCh is
// also the sender of stopCh. It is
// safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
Copy the code
As noted in the notes, for additional signal channels, the sender is the receiver of the data channel. Additional signal channels are closed by their unique sender, which preserves the channel closure principle.
In this case, the channel dataCh is never closed. No, the tunnel doesn’t have to be closed. If there is no longer a Goroutine referencing a channel, whether it is closed or not, it will eventually be garbage collected. So the elegance of closing a channel here is not closing a channel.
- M receivers, N senders, any one of them saying “let’s end the game” tells the mediator to close an extra channel
This is a most complicated situation. We cannot allow any receiver or sender to close the data channel. We can’t have any recipient close an additional signal channel and tell all senders and receivers to quit the game. Doing either will break the principle of channel closure. However, we can introduce a regulator role to close additional signal channels. One of the tricks in the following example is how to use the try send operation to inform the mediator to close the additional signal channel.
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main(a) {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown
// below, and its receivers are all senders
// and receivers of dataCh.
toStop := make(chan string.1)
// The channel toStop is used to notify the
// moderator to close the additional signal
// channel (stopCh). Its senders are any senders
// and receivers of dataCh, and its receiver is
// the moderator goroutine shown below.
// It must be a buffered channel.
var stoppedBy string
// moderator
go func(a) {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
// Here, the try-send operation is
// to notify the moderator to close
// the additional signal channel.
select {
case toStop <- "sender#" + id:
default:}return
}
// The try-receive operation here is to
// try to exit the sender goroutine as
// early as possible. Try-receive and
// try-send select blocks are specially
// optimized by the standard Go
// compiler, so they are very efficient.
select {
case <- stopCh:
return
default:}// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and for ever in theory) if the send
// to dataCh is also non-blocking. If
// this is unacceptable, then the above
// try-receive operation is essential.
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Same as the sender goroutine, the
// try-receive operation here is to
// try to exit the receiver goroutine
// as early as possible.
select {
case <- stopCh:
return
default:}// Even if stopCh is closed, the first
// branch in this select block might be
// still not selected for some loops
// (and forever in theory) if the receive
// from dataCh is also non-blocking. If
// this is not acceptable, then the above
// try-receive operation is essential.
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max- 1 {
// Here, the same trick is
// used to notify the moderator
// to close the additional
// signal channel.
select {
case toStop <- "receiver#" + id:
default:}return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
Copy the code
In this example, the channel closing principle remains the same.
Note that the buffer size (capacity) of channel toStop is 1. This is to avoid the loss of the first notification sent before the moderator Goroutine is ready to receive it from toStop.
We can also set the capacity of the toStop channel to the total number of senders and receivers, eliminating the need for a try-send SELECT block to notify the mediator.
. toStop :=make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return}...if value == Max- 1 {
toStop <- "receiver#" + id
return}...Copy the code
- A variation of the “M receivers, 1 sender” scenario: the close request is made by a third-party Goroutine
Sometimes a shutdown signal must be sent by a third-party Goroutine. In this case, we can use an additional signal channel to inform the sender to close the data channel. For example,
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main(a) {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 100000
const NumReceivers = 100
const NumThirdParties = 15
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int)
closing := make(chan struct{}) // signal channel
closed := make(chan struct{})
// The stop function can be called
// multiple times safely.
stop := func(a) {
select {
case closing<-struct{}{}:
<-closed
case <-closed:
}
}
// some third-party goroutines
for i := 0; i < NumThirdParties; i++ {
go func(a) {
r := 1 + rand.Intn(3)
time.Sleep(time.Duration(r) * time.Second)
stop()
}()
}
// the sender
go func(a) {
defer func(a) {
close(closed)
close(dataCh)
}()
for {
select{
case <-closing: return
default:}select{
case <-closing: return
case dataCh <- rand.Intn(Max):
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func(a) {
defer wgReceivers.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
Copy the code
The idea used in the stop function was picked up from Roger Peppe’s comments.
- A variation of the “N sender” case: the data channel must be closed to tell the receiver that the data transmission has ended
In the above N sender case, we avoid closing the data channel in order to maintain the channel closing principle. However, sometimes it is required that the data channel must be closed at the end to let the receiver know that the data transmission has ended. For this case, we can use an intermediate channel to convert the case of N sender to the case of 1 sender. The middle channel has only 1 sender, so we can close it instead of closing the raw data channel.
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main(a) {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const Max = 1000000
const NumReceivers = 10
const NumSenders = 1000
const NumThirdParties = 15
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int) // will be closed
middleCh := make(chan int) // will never be closed
closing := make(chan string) // signal channel
closed := make(chan struct{})
var stoppedBy string
// The stop function can be called
// multiple times safely.
stop := func(by string) {
select {
case closing <- by:
<-closed
case <-closed:
}
}
// the middle layer
go func(a) {
exit := func(v int, needSend bool) {
close(closed)
if needSend {
dataCh <- v
}
close(dataCh)
}
for {
select {
case stoppedBy = <-closing:
exit(0.false)
return
case v := <- middleCh:
select {
case stoppedBy = <-closing:
exit(v, true)
return
case dataCh <- v:
}
}
}
}()
// some third-party goroutines
for i := 0; i < NumThirdParties; i++ {
go func(id string) {
r := 1 + rand.Intn(3)
time.Sleep(time.Duration(r) * time.Second)
stop("3rd-party#" + id)
}(strconv.Itoa(i))
}
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
stop("sender#" + id)
return
}
select {
case <- closed:
return
default:}select {
case <- closed:
return
case middleCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for range [NumReceivers]struct{} {} {go func(a) {
defer wgReceivers.Done()
for value := range dataCh {
log.Println(value)
}
}()
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
Copy the code
More?
There should be more case variables, but the ones shown above are the most common and basic. Through the clever use of channels (and other concurrent programming techniques), I believe that a solution can be found for each case variable to keep the channel closed principle.
conclusion
There are no circumstances that will force you to break the channel closure principle. If this happens to you, reconsider your design and rewrite your code.
Programming with Go Channels is like making art.
Original link: go101.org/article/cha…