Don’t communicate by sharing memory. Communicate by sharing memory.
The underlying data structure
The underlying data structure needs to see hchan source code:
type hchan struct {
// The number of elements in chan
qcount uint
//chan The length of the underlying loop array
dataqsiz uint
// A pointer to the underlying loop array
// Only for buffered channels
buf unsafe.Pointer
// the size of the element in chan
elemsize uint16
// chan is a closed flag
closed uint32
// the element type in chan
elemtype *_type
// Index of the sent element in the loop array
sendx uint
// The index of the received element in the loop array
recvx uint
// The goroutine queue waiting to receive
recvq waitq
// The goroutine queue waiting to be sent
sendq waitq
// Protect all fields in hchan
lock mutex
}
Copy the code
Buf refers to the underlying loop array, which is only available for buffered channels.
Sendx and Recvx both point to the underlying loop array and represent the current index of the element position (relative to the underlying array) that can be sent and received.
Sendq and recvq respectively represent blocked goroutines that are blocked because they are trying to read a channel or send data to a channel.
Waitq is a bidirectional linked list of sudog, which is actually an encapsulation of Goroutine:
type waitq struct {
first *sudog
last *sudog
}
Copy the code
Lock is used to ensure that every operation that reads or writes a channel is atomic.
For example, create a channel data structure of type 6 with elements of type int as follows:
Create a chan
In general, use make to create a channel that can send and receive:
// No buffered channel
ch1 := make(chan int)
// There is a buffer channel
ch2 := make(chan int.10)
Copy the code
From assembly analysis, we know that the function that ultimately creates chan is makechan:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
/ /...
// Calculate the amount of memory to use
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))}var c *hchan
switch {
case mem == 0:
// 1. Unbuffered, buf is useless, direct to the starting address of chan
Struct {}; struct{}; struct{}
// Because we only use the receive and send cursor, we don't actually copy things to C.bouf (this overwrites the contents of chan)
c = (*hchan)(mallocgc(hchanSize, nil.true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// If there are no Pointers in the hchan structure, GC will not scan chan elements
// Allocate only "hchan struct size + element size * number "memory
c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Allocate pointer elements
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// Loop array length
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
...
return c
}
Copy the code
When a new chan is created, it will be allocated on the heap. It will look something like this:
What is the nature of a channel send and receive element
The send and receive operations of a channel are essentially copies of values, either from sender Goroutine’s stack to chan BUf, or from Chan BUf to receiver Goroutine, Or you can go directly from sender Goroutine to Receiver Goroutine.
Here’s an example:
type user struct {
name string
age int8
}
var u = user{name: "Ankur", age: 25}
var g = &u
func modifyUser(pu *user) {
fmt.Println("modifyUser Received Vaule", pu)
pu.name = "Anand"
}
func printUser(u <-chan *user) {
time.Sleep(2 * time.Second)
fmt.Println("printUser goRoutine called", <-u)
}
func main(a) {
c := make(chan *user, 5)
c <- g
fmt.Println(g)
// modify g
g = &user{name: "Ankur Anand", age: 100}
go printUser(c)
go modifyUser(g)
time.Sleep(5 * time.Second)
fmt.Println(g)
}
Copy the code
&{Ankur 25}
modifyUser Received Vaule &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}
Copy the code
3. Here is a good example of shared memory by communicating.
Start by constructing a structure u at address 0x566420, with its contents above the address in the figure. We then assign &u to the pointer G, whose address is 0x565bb0 and whose content is an address pointing to u.
In the main program, g is sent to C first. According to the nature of copy Value, the entry into chan Buf is 0x56420, which is the value of pointer G (not what it points to), so when printing the elements received from the channel, It is &{Ankur 25}. Thus, instead of “sending” the pointer G to a channel, we simply copy its value.
What is the process of receiving data from a channel
Source code analysis
The receive operation can be written in two ways: one with “OK”, indicating whether the channel is closed; One without “OK”, in which there is no way of knowing if a zero of the corresponding type is received from the real sender or from the default type returned to the receiver when a channel is closed. Both have their own application scenarios.
After being processed by the compiler, these two methods correspond to the two functions in the source code:
// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
Copy the code
The chanrecv1 function handles the case without “OK”, and chanrecv2 returns the received field to indicate whether a channel was closed. In any case, the final shift to the Chanrecv function is:
/ / is located in the SRC/runtime/chan. Go
// The chanrecv function receives the elements of channel C and writes them to the memory address pointed to by ep.
// If ep is nil, the received value is ignored.
// If block == false, that is, if no data can be received, return (false, false)
// Otherwise, if c is off, clear the address to which ep points, returning (true, false)
// Otherwise, fill the memory address pointed to by ep with the return value. Return (true, true)
// If ep is not empty, it should point to the heap or the function caller's stack
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
/ /...
if c == nil {
// If not blocked, return (false, false)
if! block {return
}
// Otherwise, receive a nil channel and goroutine hangs
gopark(nil.nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")}// Fast channel: check if the non-blocking operation failed without obtaining the lock.
if! block && empty(c) {// If closed, return receive failure (false, false)
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
ifep ! =nil {
typedmemclr(c.elemtype, ep)
}
// Can send, cannot receive
return true.false}}var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// Channel is closed and there are no elements in the loop array buf
// This can handle non-buffered closure and buffered closure with no buF element
// In a buffered channel, even if the channel is closed,
// BuF can receive elements even if there are elements in it
ifc.closed ! =0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
ifep ! =nil {
// Receives from a closed channel without ignoring the return value
// Then the received value will be a zero value of that type
// typeDMemclr clears the memory of the corresponding address according to the type
typedmemclr(c.elemtype, ep)
}
return true.false
}
// If there is a goroutine in the waiting queue, the BUF is full
// This can be:
// 1. Unbuffered channel
// 2. Buffer channel, but buF is full
// For 1, make a memory copy directly (from sender goroutine -> Receiver Goroutine)
// For 2, the element from the head of the loop array is received and the element from the sender is placed at the end of the loop array
ifsg := c.sendq.dequeue(); sg ! =nil {
recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true.true
}
// Buffer type, buF has elements, can receive normally
if c.qcount > 0 {
// Find the element to receive directly from the loop array
qp := chanbuf(c, c.recvx)
// Instead of "< -ch ", "val < -ch ", ep refers to val
ifep ! =nil {
typedmemmove(c.elemtype, ep, qp)
}
// Clear the loop array at the corresponding position
typedmemclr(c.elemtype, qp)
// The receive cursor moves forward
c.recvx++
// The receive cursor returns to zero
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// The number of elements in the buf array is reduced by 1
c.qcount--
unlock(&c.lock)
return true.true
}
if! block {// Non-blocking receive, unlock. Selected returns false because no value was received
unlock(&c.lock)
return false.false
}
// The next step is to block
// Construct a sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
// The address to receive data is saved
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// Enter the channel waiting receive queue
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// Suspend the current goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// Woke up and proceeded with some cleaning up from here
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
Copy the code
The above code is annotated in more detail, you can look at the source line by line, let’s look at it in detail.
-
If channel is nil, in non-blocking mode, it just returns. In blocking mode, the gopark function is called to suspend the Goroutine, which will continue to block. Because when a channel is nil, the only way to avoid blocking is to close it, but closing a nil channel will cause panic, so there is no chance to wake up. More details can be found in the closechan function.
In non-blocking mode, skip with default:
select { case msg := <-messages: fmt.Println("received message", msg) case sig := <-signals: fmt.Println("received signal", sig) default: fmt.Println("no activity")}Copy the code
-
As with the send function, the next step is to quickly detect the failure and return it in non-blocking mode without acquiring the lock. By the way, when we’re writing code, if we find some boundary conditions, and we go back quickly, it makes the logic of the code clearer, because there’s less of the normal stuff going on, it’s more focused, and the person who’s looking at the code can focus more on the core logic.
if! block && empty(c) {// ... return } func empty(c *hchan) bool { // c.dataqsiz is immutable. if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 } Copy the code
When we observe that a channel is not ready to receive:
- Non-buffered, not in the queue waiting to be sent
goroutine
Waiting for the - Cushioned, but
buf
There are no elements in
The next operation, first of all a lock, relatively large granularity. If channel is closed and there are no elements in the loop array buf. For unbuffered closed and buffered closed buF with no elements, return zero for the corresponding type, but the received flag is false, telling the caller that the channel was closed and that the value you retrieved was not normally sent by the sender. But in the select context, this case is selected. Many scenarios that use a channel as a notification signal hit here.
If there is a queue waiting to be sent, the channel is full, either unbuffered or buffered, but buF is full. In both cases, data can be received normally.
Hence, the recv function is called:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {
// If the channel is not buffered
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// The received data is not ignored
ifep ! =nil {
// Copy data directly from sender goroutine -> Receiver Goroutine
recvDirect(c.elemtype, sg, ep)
}
} else {
// Buffered channel, buF is full.
// Copy the element at the beginning of the loop array buf to the address where the data is received
// enqueue the sender's data. In fact revx and sendx are equal at this point
// Find the receive cursor
qp := chanbuf(c, c.recvx)
// Copy the data at the receiving cursor to the receiver
ifep ! =nil {
typedmemmove(c.elemtype, ep, qp)
}
// Copy the sender data to buF
typedmemmove(c.elemtype, qp, sg.elem)
// Update the cursor value
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
// Wake up the sent goroutine. You have to wait for the scheduler
goready(gp, skip+1)}Copy the code
If it is unbuffered, it is copied directly from the sender’s stack to the receiver’s stack.
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
Copy the code
Otherwise, it is a buffered channel and buF is full. Note Send and receive cursors overlap. Therefore, locate the receive cursors first
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
Copy the code
Copies the element to the receive address. The data to be sent by the sender is then copied to the receive cursor. This completes the operation of receiving and sending data. The sending and receiving cursors are then advanced one, respectively, starting from zero if a “wrap” occurs. Finally, take out the Goroutine in the Sudog, call GoReady to change its state to “runnable”, and the sender wakes up waiting for the scheduler.
- And then, if
channel
的buf
There are also data, that can be relatively normal reception. Notice, here, even inchannel
You can walk here if it’s closed. This step is relatively simple, normalbuf
Copy the data at the receive cursor to the address that receives the data. - And then at the last step, we’re going to have a situation where we’re going to be blocking. Of course, if
block
The value passed in is zerofalse
“, then do not block, just return.
First construct a sudog, then save the various values. Notice that the address of the received data is stored in the ELEm field, and when awakened, the received data is stored at the address pointed to by this field. Sudog is then added to the channel’s RECVQ queue. Call goparkunlock to suspend the Goroutine.
The next piece of code is to put the finishing touches to goroutine’s wake up.
Case analysis
The process of receiving data from a channel and sending data to a channel is illustrated using the following example:
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("G1 received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("G2 received data: ", val)
return
}
func main(a) {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
}
Copy the code
We first create an unbuffered channel, then launch two Goroutines and pass in the previously created channel. Then, send data 3 to this channel, and finally sleep 1 second after the program exits.
In line 14 we create a channel that is not buffered. Let’s look at the state of the chan as a whole by looking at some important fields in the chan structure.
Next, lines 15 and 16 create a Goroutine, each performing a receive operation. From the previous source code analysis, we know that both goroutines (hereinafter called G1 and G2) are blocked in the receive operation. G1 and G2 hang in the RECQ queue of a channel, forming a bidirectional circular linked list.
Buf refers to an array of length 0, and qcount is 0, indicating that there are no elements in the channel. Focus on recvq and sendq, which are waitq constructs, and waitq is actually a bidirectional linked list. The element of the list is Sudog, which contains the G field. G represents a Goroutine, so sudog can be considered a Goroutine. Recvq stores goroutines that try to read a channel but are blocked, and Sendq stores goroutines that try to write a channel but are blocked.
At this point, we can see that recVQ has two Goroutines hanging, namely G1 and G2, which were booted earlier. G1 and G2 are blocked because there is no Goroutine to receive and channels are of unbuffered type. Sendq has no blocked goroutine.
The recVQ data structure is as follows:
Let’s look at chan’s state as a whole:
G1 and G2 are suspended. State isWAITING
.
Let’s go back to the example. If G1 (go goroutineA(ch)) runs to val := < -a, it will go from running to waiting (gopark) :
G1 disconnects from M, but the scheduler can’t keep M idle, so it schedules another Goroutine to run:
The same goes for G2. Now G1 and G2 are suspended, waiting for a sender to send data to the channel.
What is the process of sending data to a channel
Source code analysis
Send operation finally converted into hansend function, directly on the source, also most of the annotations, you can understand the main process:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// Cannot block. Return false to indicate that the message was not sent successfully
if! block {return false
}
// The current goroutine is suspended
gopark(nil.nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")}/ /...
// For non-blocking SEND, quickly detect failure scenarios
// If the channel is not closed and the channel has no extra buffer space. This could be:
// 1. Channel is non-buffered, and there is no Goroutine in the waiting queue
// 2. channel is buffered, but the loop array is full of elements
if! block && c.closed ==0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
ifc.closed ! =0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))}// If there is a goroutine in the receiving queue, the data to be sent is copied directly to the receiving Goroutine
ifsg := c.recvq.dequeue(); sg ! =nil {
send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true
}
// For buffered channels, if there is buffer space
if c.qcount < c.dataqsiz {
// qp points to the sendx location of buf
qp := chanbuf(c, c.sendx)
// Copy data from ep to QP
typedmemmove(c.elemtype, qp, ep)
// Send the cursor value plus 1
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// The number of elements in the buffer is increased by one
c.qcount++
unlock(&c.lock)
return true
}
// If no blocking is required, an error is returned
if! block { unlock(&c.lock)return false
}
// The channel is full and the sender is blocked. Next, a sudog is constructed
// Get the pointer to the current goroutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// The current goroutine enters the send wait queue
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// The current goroutine is suspended
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = falseclosed := ! mysg.success gp.param =nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)}// remove the channel bound to mysg
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")}// The channel is closed after being awakened. Oh, panic
panic(plainError("send on closed channel"))}return true
}
Copy the code
The code comments are more detailed, let’s look at it in detail.
- If you detect
channel
Is empty, currentgoroutine
Will be suspended. - For non-blocking send operations, if
channel
Not closed and no extra buffer space (note:a. channel
Unbuffered and not in the waiting receive queuegoroutine
;b. channel
Is buffered, but the loop array is full of elements.
The Runtime source code has a lot of comments for this. The purpose of this statement is to quickly detect a send failure without blocking the send and return quickly.
if! block && c.closed ==0 && full(c) {
return false
}
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil
}
return c.qcount == c.dataqsiz
}
Copy the code
The comments are about why this block can be unlocked, so I’ll explain it in detail. The if condition reads two variables first: block and c.closed. A block is an argument to a function that doesn’t change; C. closed can be changed by other goroutines because it is not locked. These are the first two expressions of the and condition. In this case, c.dataqsiz will not actually be modified, as it is determined at creation time. The real impact of not locking is on C.qucount and C.recvq.first. The goal is to acquire fewer locks and improve performance.
If the channel is closed, panic occurs
If a sudog (representing a goroutine) is queued from recVQ, the channel is empty and has no elements, so there is a waiting receiver. The send function is called to copy elements directly from the sender’s stack to the receiver’s stack, with the sendDirect function doing the critical work
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {
// ...
// sg.elem points to the location where the received value is stored, such as val < -ch, which is &val
ifsg.elem ! =nil {
// Copy memory directly (from sender to receiver)
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// Goroutine binding on sudog
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
// Wake up the received goroutine. Skip is associated with the print stack
goready(gp, skip+1)}Copy the code
Continue with the sendDirect function:
// Send data to a non-buffered channel, from a channel with no elements (non-buffered or buffered but empty)
// Receiving data causes one Goroutine to operate directly on another goroutine's stack
// Since GC assumes that writes to the stack can only happen while the Goroutine is running and written by the current Goroutine
// So this actually violates the assumption. This can cause some problems, so you need to use write barriers to get around it
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// SRC On the current goroutine stack, DST is another goroutine stack
// Perform a memory "move"
// If the destination address has a stack shrink, when we read sg.elem
// The real DST location cannot be modified
// Hence the need to put a barrier between reading and writing
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
Copy the code
This involves the operation of one Goroutine writing directly to another goroutine stack. In general, different goroutine stacks are unique to each other. This also violates some of GC’s assumptions. In order to avoid problems, a write barrier is added during the write process to ensure that the write operation is correctly completed. This has the advantage of reducing memory copy once: instead of copying to channel.buf first, it goes directly from the sender to the receiver.
Then, the receiver is unlocked, woken up, and waiting for the scheduler to arrive, the receiver is out of sight and ready to continue executing the code after the receive operation.
- if
c.qcount < c.dataqsiz
, indicating that a buffer is available (must be a buffered channel). First, use the function to fetch the position to which the element to be sent should go:
qp := chanbuf(c, c.sendx)
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
Copy the code
C. Sendx points to the position of the next element to be sent in the loop array, and calls the TypedmemMove function to copy it into the loop array. After that, c.sendx is incremented by 1, the total number of elements is incremented by 1: c. count++. Finally, unlock and return.
- If no match is found, it indicates
channel
It’s full. No matter thechannel
Buffered or unbuffered, you have to take thissender
“Locked up” (goroutine
Blocked). ifblock
为false
, directly unlock, return tofalse
. - Finally, there is the case where you really need to block. Let’s construct one
sudog
To join the team (Channel. Sendq fields
). And then callgoparkunlock
The currentgoroutine
Suspend, unlock, and wait for the right time to wake up.
After waking up, proceed from the next line of goparkunlock. Sudog binds the goroutine to the G field, while goroutine binds sudog to the waiting field. Sudog also binds the address of the element to be sent to the elem field. And the CHANNEL where the C field is bound to be “pit” here.
So, the address of the element to be sent is actually stored in the sudog structure, which is the current gopark.
Case analysis
Ok, look at the source code. Let’s move on to the example, which looks like this:
func goroutineA(a <-chan int) {
val := <- a
fmt.Println("goroutine A received data: ", val)
return
}
func goroutineB(b <-chan int) {
val := <- b
fmt.Println("goroutine B received data: ", val)
return
}
func main(a) {
ch := make(chan int)
go goroutineA(ch)
go goroutineB(ch)
ch <- 3
time.Sleep(time.Second)
ch1 := make(chan struct{})}Copy the code
In the sending section we said that G1 and G2 are now suspended, waiting for the sender to rescue them. At line 17, the main coroutine sends an element 3 to CH, and let’s see what happens next.
> > < span style = “font-size: 10.5pt;” And add it to P’s runnable Goroutine queue.
The sender then copies the send element to the ELEm address of sudog, and finally wakes up G1 with a call to GoReady that changes the state to Runnable.
When the scheduler visits G1, put G1 in the running state and execute the following code from goroutineA. G for any other goroutine you might have.
Under what circumstances can a channel cause a resource leak
The reason for the leak is that after goroutine operates on the channel, it is blocked in the sending or receiving state, and the channel is full or empty and cannot be changed. At the same time, the garbage collector does not collect the resource, resulting in gouroutine waiting in a queue and never seeing the light of day.
In addition, if there are no goroutine references to a channel during program execution, the GC will reclaim it without causing a memory leak.
How to gracefully close a channel
Do not close a channel from one receiver, nor do you close a channel if you have multiple sender.
Well, it’s the sender that sends elements to the channel, so the Sender can decide when not to send data and close the channel. But if you have multiple sender, and one sender can’t determine the other sender, you can’t close the channel.
There are two less elegant ways to close a channel:
- Using the defer-recover mechanism, feel free to close a channel or send data to it. Even if panic occurs, defer-recover is at the bottom of the pocket.
- Use sync.once to ensure only one shutdown.
So how do you gracefully close a channel?
According to the number of sender and receiver, there are the following cases:
- a
sender
, areceiver
- a
sender
, Mreceiver
- N
sender
, areciver
- N
sender
, Mreceiver
So for 1,2, I don’t have to say that there’s only one sender, just close it from the sender, no problem. Focus on case 3 and 4.
In the third case, the only receiver says “Please stop sending more” by closing an Additional signal channel.
The solution is to add a channel to transmit the closing signal, and the receiver sends the instruction to close the data channel through the signal channel. Senders listen for the shutdown signal and stop receiving data. The code is as follows:
func main(a) {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumSenders = 1000
dataCh := make(chan int.100)
stopCh := make(chan struct{})
// senders
for i := 0; i < NumSenders; i++ {
go func(a) {
for {
select {
case <- stopCh:
return
case dataCh <- rand.Intn(Max):
}
}
}()
}
// the receiver
go func(a) {
for value := range dataCh {
if value == Max- 1 {
fmt.Println("send stop signal to senders.")
close(stopCh)
return
}
fmt.Println(value)
}
}()
select {
case <- time.After(time.Hour):
}
}
Copy the code
The stopCh here is the signal channel, and it only has one sender, so you can just close it. After senders receives the shutdown signal, the select branch case < -stopch is selected and exits the function, no more data is sent.
It should be noted that the above code does not explicitly close dataCh. In the Go language, if a channel ends up without any Goroutine referencing it, it will eventually be reclaimed by gc regardless of whether the channel is closed or not. So, in this case, gracefully closing a channel means not closing the channel and letting the GC do it for you. Finally, the graceful way to close a channel is: Any one of them says “let’s end the game” by notifying a moderator to close an additional signal channel.
Different from the third case, there are M receivers. If the third solution is adopted directly, and the receiver directly closes stopCh, a channel will be closed repeatedly, resulting in panic. Therefore, it is necessary to add a middleman, to which M receivers send “requests” to close dataCh. After receiving the first request, the middleman will directly issue instructions to close dataCh (by closing stopCh, there will be no repeated closing. Because stopCh is sent only by an intermediary). Alternatively, the N sender here could send a request to the middleman to close the dataCh.
func main(a) {
rand.Seed(time.Now().UnixNano())
const Max = 100000
const NumReceivers = 10
const NumSenders = 1000
dataCh := make(chan int.100)
stopCh := make(chan struct{})
// It must be a buffered channel.
toStop := make(chan string.1)
var stoppedBy string
// moderator
go func(a) {
stoppedBy = <-toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(Max)
if value == 0 {
select {
case toStop <- "sender#" + id:
default:}return
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
for {
select {
case <- stopCh:
return
case value := <-dataCh:
if value == Max- 1 {
select {
case toStop <- "receiver#" + id:
default:}return
}
fmt.Println(value)
}
}
}(strconv.Itoa(i))
}
select {
case <- time.After(time.Hour):
}
}
Copy the code
ToStop is the middleman in the code. It is used to receive the closing dataCh request from Senders and Receivers.
ToStop is declared as a buffered channel. If toStop declares a non-buffered channel, the first close dataCh request sent may be lost. Since both sender and Receiver send requests through select statements, if the goroutine in which the middleman is located is not ready, the select statement is not selected and goes to default, doing nothing. In this way, the first request to close the dataCh is lost.
If, we declare the capacity of toStop as Num(senders) + Num(Receivers), the part that sends the dataCh request can be changed to a more terse form:
. toStop :=make(chan string, NumReceivers + NumSenders)
...
value := rand.Intn(Max)
if value == 0 {
toStop <- "sender#" + id
return}...if value == Max- 1 {
toStop <- "receiver#" + id
return}...Copy the code
Send the request directly to toStop, because toStop is large enough that you don’t have to worry about blocking, so you don’t need to add a default case to the SELECT statement to avoid blocking.