🧑🏫 Understanding of synchronization component Chan in Go.
In-depth analysis of Chan underlying source code Chan working principle.
Learning is more dry grass, but in persistence. There is an understanding of the source code I have written in the source code comments. **Go source code looks much more comfortable than Java, just for the environment, ha ha ha, Java look at a troublesome batch of Spring source code.
🏷️ send data to Chan.
1.1 Overview: Chan sends data through several steps.
- The first case:
chan
Off state, directpanic
, indicating that the closed state cannot be sent tochan
Read data. - Second case: view the current situation directly
chan
Is the waiting queue for receiving data empty? If not, send data directly. - Third case: View the current and
chan
Whether the associated ring queue is empty, and if not, the current data to be sent into it. - Fourth case: return directly if it is non-blocking, or if it is blocking, return the current
goroutine
Added to thechan
The associated wait queue waiting for data to be sent.
1.2 Source code analysis.
Whether sent by x<-1 or by the select keyword, runtime.chansend1 is finally called, and inside that method is a call to chansend. So chan sends data through the chansend function.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
Copy the code
Let’s take a look at the overall source code.
Here is our source code analysis, where I will clean up the extraneous source code and comments
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// chan the method that must be called when sending data.
if c == nil {
if! block {return false
}
gopark(nil.nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")}if! block && c.closed ==0 && full(c) {
// Non-blocking.
return false
}
lock(&c.lock)
// 0 should not be closed.
ifc.closed ! =0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))}// Step 1: REGARDLESS of whether there is a free position in the ring queue, I now send. If there is a goroutine waiting to receive, THEN I directly send the current data to G in the waiting queue.
ifsg := c.recvq.dequeue(); sg ! =nil {
send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true
}
// Step 2: If there is no G waiting to receive and there is free space in the ring queue, write directly to Chan's buffer.
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// Calculate the subscript that can be stored.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)}// Copy the sent data to the buffer.
// ep is the data to be sent this time.
typedmemmove(c.elemtype, qp, ep)
// Move the subscript.
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
/ / increase the count.
c.qcount++
unlock(&c.lock)
return true
}
// Whether it is blocked.
// Use the select keyword to send messages to Chan that are not blocking.
// the chan write block triggered by the select keyword is false.
// Blocking will give up CPU waiting time.
if! block {// The select keyword will take this.
// If it is not blocked, the CPU does not give up execution time.
unlock(&c.lock)
return false
}
// There are no acceptable goroutines in the wait queue.
// The current goroutine is added to the waiting queue.
// Then gopack.
// The implementation is complicated.
// Block on the channel. Some receiver will complete our operation for us.
// getg gets the Goroutine used to send data.
gp := getg()
// Get the structure representing the current G: runtime.sudog. And set this time to block the relevant messages sent.
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
// Add the newly created sudog to the send wait queue and set the current Goroutine waiting.
gp.waiting = mysg
gp.param = nil
// Add the current Goroutine to the current Chan wait send queue.
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// Put the current Goroutine to sleep and wait to wake up.
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// Indicates that the current Goroutine has been awakened to do some finishing work on the star, setting some properties to zero, and releasing runtime.sudog.
// The sudog structure should be associated with goroutine.
// someone woke us up.
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")}// Here is the irrelevant code.
gp.waiting = nil
gp.activeStackChans = falseclosed := ! mysg.success gp.param =nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
Copy the code
🏷️
2.1 Overview: Accept data from chan
- Case 1: non-blocking, empty cache, return directly.
- Second case: Chan closes and the cache returns empty.
- Third case: if chan, queue head to be sent is not nil, directly get the data of the pair head.
- Fourth case: if ring queue
qcount
If it is not 0, it is directly fetched from the ring queue. - Fifth case: if it is not blocked, then return directly. If it is blocked, then it will enter the blocking queue waiting to receive data.
2.2 Source code analysis.
Go sends data via chan, and eventually calls chanrecv to complete the sending. In the following code I will remove useless code and useless comments.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// chan accepts data.
// Chan is nil and chan is closed.
if c == nil {
// If block is true, it is blocked.
if! block {// Non-blocking direct return, where the default value is zero.
return
}
// Gopark means sleep.
gopark(nil.nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")}// Block is true to block.
// empty determines the container in the ring queue in chan.
// Fast path: check for failed non-blocking operation without acquiring the lock.
// If it is non-blocking and the cache is empty, return.
if! block && empty(c) {// Non-blocking and empty then the rest should be: non-blocking and non-empty. Other states of blocking.
// Check whether the CAS loading chan is disabled.
if atomic.Load(&c.closed) == 0 {
Return false if the non-blocking ring queue is empty and not closed.
return
}
// If chan is closed.
if empty(c) {
// the capacity of the ring queue in chan is 0, or the capacity is not 0, but qcount is 0.
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
// Data sent is not nil.
ifep ! =nil {
typedmemclr(c.elemtype, ep)
}
// There is a problem: there is no way to determine whether the send queue is empty.
// Only select should trigger non-blocking in Go.
return true.false}}// If the code can run up to this point, it is either non-blocking and the cache is not empty or it is blocked.
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// Closed 0 indicates that the system is not closed.
// chan is closed, but the cache is empty.
ifc.closed ! =0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
ifep ! =nil {
typedmemclr(c.elemtype, ep)
}
return true.false
}
// The specification that can be executed up to this point must accept data.
ifsg := c.sendq.dequeue(); sg ! =nil {
// Accept data.
recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true.true
}
// The cache is not empty.
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)}ifep ! =nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true.true
}
// If it is not blocked, return.
if! block { unlock(&c.lock)return false.false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
// The goroutine that will receive data is added to the wait queue that will receive data.
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
Copy the code
Welcome to pay attention to the public number: pony teach you to write bugs. Welcome to pay attention to bilibili: LaoMaii. Welcome to my personal website: Mazhenxin.xyz