preface
Let’s start with a channel layout diagram. The bottom layer of a channel is actually not complicated and does not use very advanced knowledge. It is mainly developed around a ring queue and two linked lists. By the end of this article you will have mastered the implementation of channel.
Hd address
Introduction of the channel
- A channel is a type pipe through which messages can be sent and received between groutine
- The go language level provides a means of communication between Groutine
You should be familiar with the use of channels in your daily development, but once you understand the basics, are you curious about the underlying implementation and why it enables concurrent groutine communication? With this curiosity, let’s take a look at the underlying source code implementation of channel!
The channel to use
Here is the simplest way to use channel:
package main
import "fmt"
func main(a) {
c := make(chan int)
go func(a) {
// Send data to channel
c <- 1} ()// Fetch data from channel
x := <- c
close(c)
fmt.Println(x)
}
Copy the code
Channel source entry
The symbols such as make and <- used by channel have no corresponding implementation in the source code, but are translated into the underlying implementation by the compiler. Use the following command to translate the GO source code into assembly
go tool compile -N -l -S main.go>hello.s
Copy the code
View some of the core content with the CALL command as follows:
0x0043 00067 (main.go:42) CALL runtime.makechan(SB)
0x006a 00106 (main.go:44) CALL runtime.newproc(SB)
0x008b 00139 (main.go:47) CALL runtime.chanrecv1(SB)
0x0032 00050 (main.go:45) CALL runtime.chansend1(SB)
0x00a3 00163 (main.go:48) CALL runtime.closechan(SB)
Copy the code
You can guess the correspondence:
- Make (chan int) make(chan int
- Coroutine creation: runtime.newproc function
- Ch < -1 corresponds to runtime.chansend1
- X := < -ch Corresponds to the runtime.chanrecv1 function
- Close (c) Close a channel statement corresponding to runtime.closechan
The source code can be found in the runtime/chan.go file by searching globally
func makechan(t *chantype, size int) *hchan {}
func chansend1(c *hchan, elem unsafe.Pointer) {}
func chanrecv1(c *hchan, elem unsafe.Pointer) {}
func closechan(c *hchan) {}
Copy the code
Source code analysis
Each of the above three functions uses a parameter of type Hchan, which is the core data structure of the channel. Let’s start with Hchan
The internal data structure of chan can also be seen when debugging with breakpoints in the IDE
- Location: SRC/runtime/chan. Go
Chan data structure
- The internal data structure of a channel is a bidirectional circular list of fixed length
- Press smooth to write data inside, write full and start from 0
- The two important components in CHAN are
buf
andwaitq
All behavior and implementation revolves around two components
Github Go night read provided this image is more image, direct quote from.
type hchan struct {
// Total number of elements in the current queue
qcount uint // total data in the queue
// The length of the ring queue, i.e. the size of the buffer (specified when declaring a channel)
dataqsiz uint // size of the circular queue
// Ring queue pointer
buf unsafe.Pointer // points to an array of dataqsiz elements
// The size of each element in buf
elemsize uint16
// Whether the current channel is in the closed state. The field is 0 when the channel is created and 1 when the channel is closed
closed uint32
// The element type used for the assignment of the value passing process
elemtype *_type // element type
// Index of position sent in ring buffer
sendx uint // send index
// The position index has been received in the ring buffer
recvx uint // receive index
// The Groutine queue waiting to read messages
recvq waitq // list of recv waiters
// The groutine queue waiting to write messages
sendq waitq // list of send waiters
// Mutex locks the channel for each read/write operation (send and receive must be mutually exclusive)
lock mutex
}
// Queue data structure waiting for read and write, guaranteed first in, first out
type waitq struct {
first *sudog
last *sudog
}
Copy the code
Create a channel
Summary:
When creating a channel, you can put different types of data into the channel, and different types of data occupy different space. This determines how much storage space is required for the BUF field in hchan and Hchan. In the go source code for different situations to do different processing. There are three cases:
The general rule is: Total memory size = memory required by hchan + memory required by elements
- Queue empty or element size 0: only the size of hchan itself needs to be opened up
- Element is not pointer type: memory space required =hchan size + the size of each element * requested queue length
- The element is a pointer type: in this case, the buF needs to be allocated a separate space, and the memory occupied by the BUF is the size of each element * the requested queue length
Input:
- Chantype: indicates the channel type
- Size: indicates the channel size
Output:
- Create the hchan object
Core process:
- Calibration of various parameters
- Data assignment
- Create buffer storage space (element null, element with pointer, element without pointer)
Hd address
C := make(chan int, size)
// c := make(chan int
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// Total buff size required = the size of the element type created in channel (em.size) * size
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
var c *hchan
// Create and allocate storage space for buF
switch {
case mem == 0:
// size is 0, or the size occupied by each element is 0
// When assigning the size of buF, only the size occupied by the hchan structure itself needs to be allocated
// hchanSize is a constant, indicating the size of bytes required for empty hchan
// hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
c = (*hchan)(mallocgc(hchanSize, nil.true))
// unsafe.Pointer(&c.bof); // unsafe.Pointer(&c.bof);
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// If there are no Pointers in the queue, then each element needs to be stored and occupy the mem of the previous multiplication
// Add the space occupied by hchan itself to add up the total space occupied by hchan
c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
// Point the buf pointer to the end of the empty hchan space
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// If the chan element is pointer data, create a separate mem space for buF to store all data
c = new(hchan)
c.buf = mallocgc(mem, elem, true)}// Set the total size of chan
c.elemsize = uint16(elem.size)
// Element type
c.elemtype = elem
// The size of the ring queue is set when the user is created
c.dataqsiz = uint(size)
return c
}
Copy the code
Send data to a channel
Summary:
When sending data to a channel, it is intuitively understood to put the data into chan’s ring queue. However, GO makes some optimization: it first determines whether there is a Groutine waiting to receive data. If there is, it directly sends the data to Groutine, wakes up groutine, and does not put the data into the queue. Of course, there is another case: if the queue is full, it has to wait in the queue until some data is fetched before sending.
Input:
- Chan object
- Data to be sent
- Whether blocking
- The callback function
Output: no
Core logic:
- If RECVQ is not empty, a Groutine waiting to receive data is taken from recVQ and sent to the Groutine
- If RECVQ is empty, data is put into BUF
- If the BUF is full, the data to be sent and the current Groutine are packaged into a Sudog object and put into SendQ, and the Groutine is put into the wait state
There are groutine waiting to receive data
Hd address
Groutine without waiting to receive data, the ring queue is not full
Hd address
For groutine without waiting to receive data, the ring queue is full
Hd address
Send data source code
// ep points to the start of the data to be sent
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
/ / lock first
lock(&c.lock)
// If the channel is closed, an error is thrown
// This error is generated when a channel is not used properly
ifc.closed ! =0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))}// Fetch elements from the receive queue. If data is retrieved, the data is passed
ifsg := c.recvq.dequeue(); sg ! =nil {
// Call the send method to pass the value
send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true
}
// There is no Groutine waiting to receive data
// If the buffer is not full, the data to be sent is directly copied to the buffer
if c.qcount < c.dataqsiz {
// c.sendx is the sent index location. This method finds the index location by pointer offset
// execute c.bouf (c.endx)
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// Copy data, internal call memmove, is implemented in assembly
// Notify the receiver that the data is given to you, change the receiver coroutine from the wait state to the runnable state,
// Add the current coroutine to the coroutine queue, waiting to be scheduled
typedmemmove(c.elemtype, qp, ep)
// The data index moves forward, and if it reaches the end, it starts from 0 again
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// Increments the number of elements by 1, releases the lock, and returns
c.qcount++
unlock(&c.lock)
return true
}
// The buffer is full
// Synchronize non-blocking cases, return directly
if! block { unlock(&c.lock)return false
}
// The following are synchronous blocking cases
// The current Groutine and the data to be sent will be put into the SendQ queue and switched out of the Groutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// Put Groutine into the sendq queue
c.sendq.enqueue(mysg)
// Groutine goes to waiting, gopark is the scheduling code
// From the user's point of view, code statements that send data to a channel block
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// G is awakened
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// G is awakened, the state changes to executable state, and execution continues from here
releaseSudog(mysg)
return true
}
Copy the code
Send function
// The data ep to be sent is copied to the receiver SG, which is then woken up to continue execution
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {
// Copy data
ifsg.elem ! =nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
// Put it into the scheduling queue to be scheduled
goready(gp, skip+1)}Copy the code
Read the data
Summary:
The process of reading data from a channel is similar to that of sending, which is basically the reverse of sending.
When reading data from channel, it does not directly go to the ring queue to get data, but first determines whether there is a Groutine waiting to send data. If so, it directly takes the Groutine out of the queue, returns the data, and wakes up the Groutine. If there is no groutine waiting to send data, fetch data from the ring queue.
Input:
- Chan object
- A pointer to receive data
- Whether blocking
Output: Whether the packet is received successfully
Core logic:
- If there is a Groutine waiting to send data, fetch a Groutine waiting to send data from Sendq and fetch the data
- If there is no waiting Groutine and there is data in the ring queue, fetch data from the queue
- If there is no waiting Groutine and there is no data in the ring queue, the groutine is blocked and the groutine is packaged as sudoGo and added to the RECEVQ wait queue
There is a waiting Groutine in sendq
Hd address
Groutine in sendq without waiting, the queue is not empty
Hd address
Groutine without waiting in Sendq, queue is empty
Hd address
Read data source code
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
/ / lock
lock(&c.lock)
If there is a groutine waiting to send data, fetch data directly from the coroutine sending data
ifsg := c.sendq.dequeue(); sg ! =nil {
recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true.true
}
// chan if there is data in the ring queue
if c.qcount > 0 {
// Fetch data from the index that receives data
// equivalent to c.bouf [c.recvx]
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// Copy the data to the receiving coroutine
ifep ! =nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// The index of the received data moves forward
c.recvx++
// Ring queue, if at the end, then start from 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// Index move position of send data
c.qcount--
unlock(&c.lock)
return true.true
}
// Synchronization is non-blocking and the coroutine returns directly
if! block { unlock(&c.lock)return false.false
}
// Synchronously blocked
// If the code goes there, no data is available, blocks the coroutine, and joins the channel's receive queue
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// Add to receive queue
c.recvq.enqueue(mysg)
/ / scheduling
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
// G is awakened and continues execution from here
releaseSudog(mysg)
return true, !closed
}
Copy the code
Shut down the channel
Input: channel
Output: no
Core process:
- Set off state
- Wake up all coroutines waiting to read chanel
- All coroutines waiting to write to a channel throw an exception
func closechan(c *hchan) {
// Channel is empty, throwing an exception
if c == nil {
panic(plainError("close of nil channel"))}/ / lock
lock(&c.lock)
// If the channel is already closed, throw an exception
ifc.closed ! =0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))}// Set the value of the closed state
c.closed = 1
// Declare a list of g's to put all groutine's in
// The purpose is to release the lock as soon as possible, because there may be data in the queue that needs to be processed and the lock may be used
var glist gList
// release all readers
// Wake up all coroutines waiting to read Chanel data
for {
sg := c.recvq.dequeue()
// Wait for the queue to complete and exit
if sg == nil {
break
}
ifsg.elem ! =nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// Join the temporary queue
glist.push(gp)
}
// release all writers (they will panic)
// Handle all coroutines to send data, throw exceptions
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// Join the temporary queue
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// Process all groutine in temporary queues
for! glist.empty() { gp := glist.pop() gp.schedlink =0
// Put it into the scheduling queue to be scheduled
goready(gp, 3)}}Copy the code
conclusion
When I used Channel for the first time, I felt very complicated and magical. With this curiosity, I studied the underlying source code implementation. After reading it, I found that it was not that complicated and the underlying implementation logic was very clear. This paper sorted out the underlying logic by means of both pictures and texts, including creating channels, sending data, receiving data and so on. Of course, it also involves scheduling and other knowledge, and then specially collates an article to be analyzed.
reference
- www.cyhone.com/articles/an…
- Github.com/talkgo/nigh…
- talkgo.org/t/topic/75