preface

Let’s start with a channel layout diagram. The bottom layer of a channel is actually not complicated and does not use very advanced knowledge. It is mainly developed around a ring queue and two linked lists. By the end of this article you will have mastered the implementation of channel.

Hd address

Introduction of the channel

  • A channel is a type pipe through which messages can be sent and received between groutine
  • The go language level provides a means of communication between Groutine

You should be familiar with the use of channels in your daily development, but once you understand the basics, are you curious about the underlying implementation and why it enables concurrent groutine communication? With this curiosity, let’s take a look at the underlying source code implementation of channel!

The channel to use

Here is the simplest way to use channel:

package main
import "fmt"

func main(a) {
  c := make(chan int)
  go func(a) {
    // Send data to channel
    c <- 1} ()// Fetch data from channel
  x := <- c
  close(c)
  fmt.Println(x)
}
Copy the code

Channel source entry

The symbols such as make and <- used by channel have no corresponding implementation in the source code, but are translated into the underlying implementation by the compiler. Use the following command to translate the GO source code into assembly

go tool compile -N -l -S main.go>hello.s
Copy the code

View some of the core content with the CALL command as follows:

0x0043 00067 (main.go:42) CALL  runtime.makechan(SB)
0x006a 00106 (main.go:44) CALL  runtime.newproc(SB)
0x008b 00139 (main.go:47) CALL  runtime.chanrecv1(SB)
0x0032 00050 (main.go:45) CALL  runtime.chansend1(SB)
0x00a3 00163 (main.go:48) CALL  runtime.closechan(SB)
Copy the code

You can guess the correspondence:

  • Make (chan int) make(chan int
  • Coroutine creation: runtime.newproc function
  • Ch < -1 corresponds to runtime.chansend1
  • X := < -ch Corresponds to the runtime.chanrecv1 function
  • Close (c) Close a channel statement corresponding to runtime.closechan

The source code can be found in the runtime/chan.go file by searching globally

func makechan(t *chantype, size int) *hchan {}
func chansend1(c *hchan, elem unsafe.Pointer) {}
func chanrecv1(c *hchan, elem unsafe.Pointer) {}
func closechan(c *hchan) {}
Copy the code

Source code analysis

Each of the above three functions uses a parameter of type Hchan, which is the core data structure of the channel. Let’s start with Hchan

The internal data structure of chan can also be seen when debugging with breakpoints in the IDE

  • Location: SRC/runtime/chan. Go

Chan data structure

  • The internal data structure of a channel is a bidirectional circular list of fixed length
  • Press smooth to write data inside, write full and start from 0
  • The two important components in CHAN arebufandwaitqAll behavior and implementation revolves around two components

Github Go night read provided this image is more image, direct quote from.

type hchan struct {
  // Total number of elements in the current queue
  qcount   uint           // total data in the queue
  // The length of the ring queue, i.e. the size of the buffer (specified when declaring a channel)
  dataqsiz uint           // size of the circular queue
  // Ring queue pointer
  buf      unsafe.Pointer // points to an array of dataqsiz elements
  // The size of each element in buf
  elemsize uint16
  // Whether the current channel is in the closed state. The field is 0 when the channel is created and 1 when the channel is closed
  closed   uint32
  // The element type used for the assignment of the value passing process
  elemtype *_type // element type
  // Index of position sent in ring buffer
  sendx    uint   // send index
  // The position index has been received in the ring buffer
  recvx    uint   // receive index
  // The Groutine queue waiting to read messages
  recvq    waitq  // list of recv waiters
  // The groutine queue waiting to write messages
  sendq    waitq  // list of send waiters
  // Mutex locks the channel for each read/write operation (send and receive must be mutually exclusive)
  lock mutex
}

// Queue data structure waiting for read and write, guaranteed first in, first out
type waitq struct {
  first *sudog
  last  *sudog
}
Copy the code

Create a channel

Summary:

When creating a channel, you can put different types of data into the channel, and different types of data occupy different space. This determines how much storage space is required for the BUF field in hchan and Hchan. In the go source code for different situations to do different processing. There are three cases:

The general rule is: Total memory size = memory required by hchan + memory required by elements

  • Queue empty or element size 0: only the size of hchan itself needs to be opened up
  • Element is not pointer type: memory space required =hchan size + the size of each element * requested queue length
  • The element is a pointer type: in this case, the buF needs to be allocated a separate space, and the memory occupied by the BUF is the size of each element * the requested queue length

Input:

  • Chantype: indicates the channel type
  • Size: indicates the channel size

Output:

  • Create the hchan object

Core process:

  • Calibration of various parameters
  • Data assignment
  • Create buffer storage space (element null, element with pointer, element without pointer)

Hd address

C := make(chan int, size)
// c := make(chan int
func makechan(t *chantype, size int) *hchan {
  elem := t.elem

  // Total buff size required = the size of the element type created in channel (em.size) * size
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))

  var c *hchan
  // Create and allocate storage space for buF
  switch {
  case mem == 0:
    // size is 0, or the size occupied by each element is 0
    // When assigning the size of buF, only the size occupied by the hchan structure itself needs to be allocated
    // hchanSize is a constant, indicating the size of bytes required for empty hchan
    // hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    c = (*hchan)(mallocgc(hchanSize, nil.true))
    // unsafe.Pointer(&c.bof); // unsafe.Pointer(&c.bof);
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:
    // If there are no Pointers in the queue, then each element needs to be stored and occupy the mem of the previous multiplication
    // Add the space occupied by hchan itself to add up the total space occupied by hchan
    c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
    // Point the buf pointer to the end of the empty hchan space
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    // Elements contain pointers.
    // If the chan element is pointer data, create a separate mem space for buF to store all data
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)}// Set the total size of chan
  c.elemsize = uint16(elem.size)
  // Element type
  c.elemtype = elem
  // The size of the ring queue is set when the user is created
  c.dataqsiz = uint(size)
  return c
}
Copy the code

Send data to a channel

Summary:

When sending data to a channel, it is intuitively understood to put the data into chan’s ring queue. However, GO makes some optimization: it first determines whether there is a Groutine waiting to receive data. If there is, it directly sends the data to Groutine, wakes up groutine, and does not put the data into the queue. Of course, there is another case: if the queue is full, it has to wait in the queue until some data is fetched before sending.

Input:

  • Chan object
  • Data to be sent
  • Whether blocking
  • The callback function

Output: no

Core logic:

  1. If RECVQ is not empty, a Groutine waiting to receive data is taken from recVQ and sent to the Groutine
  2. If RECVQ is empty, data is put into BUF
  3. If the BUF is full, the data to be sent and the current Groutine are packaged into a Sudog object and put into SendQ, and the Groutine is put into the wait state

There are groutine waiting to receive data

Hd address

Groutine without waiting to receive data, the ring queue is not full

Hd address

For groutine without waiting to receive data, the ring queue is full

Hd address

Send data source code

// ep points to the start of the data to be sent
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

  / / lock first
  lock(&c.lock)

  // If the channel is closed, an error is thrown
  // This error is generated when a channel is not used properly
  ifc.closed ! =0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))}// Fetch elements from the receive queue. If data is retrieved, the data is passed
  ifsg := c.recvq.dequeue(); sg ! =nil {
    // Call the send method to pass the value
    send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
    return true
  }

  // There is no Groutine waiting to receive data
  // If the buffer is not full, the data to be sent is directly copied to the buffer
  if c.qcount < c.dataqsiz {
    // c.sendx is the sent index location. This method finds the index location by pointer offset
    // execute c.bouf (c.endx)
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }

    // Copy data, internal call memmove, is implemented in assembly
    // Notify the receiver that the data is given to you, change the receiver coroutine from the wait state to the runnable state,
    // Add the current coroutine to the coroutine queue, waiting to be scheduled
    typedmemmove(c.elemtype, qp, ep)

    // The data index moves forward, and if it reaches the end, it starts from 0 again
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }

    // Increments the number of elements by 1, releases the lock, and returns
    c.qcount++
    unlock(&c.lock)
    return true
  }

  // The buffer is full
  // Synchronize non-blocking cases, return directly
  if! block { unlock(&c.lock)return false
  }

  // The following are synchronous blocking cases
  // The current Groutine and the data to be sent will be put into the SendQ queue and switched out of the Groutine
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  ift0 ! =0 {
    mysg.releasetime = - 1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil

  // Put Groutine into the sendq queue
  c.sendq.enqueue(mysg)

  // Groutine goes to waiting, gopark is the scheduling code
  // From the user's point of view, code statements that send data to a channel block
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  KeepAlive(ep)

  // G is awakened
  ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if gp.param == nil {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))
  }
  gp.param = nil
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  mysg.c = nil

  // G is awakened, the state changes to executable state, and execution continues from here
  releaseSudog(mysg)
  return true
}
Copy the code

Send function

// The data ep to be sent is copied to the receiver SG, which is then woken up to continue execution
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {

  // Copy data
  ifsg.elem ! =nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  ifsg.releasetime ! =0 {
    sg.releasetime = cputicks()
  }
  // Put it into the scheduling queue to be scheduled
  goready(gp, skip+1)}Copy the code

Read the data

Summary:

The process of reading data from a channel is similar to that of sending, which is basically the reverse of sending.

When reading data from channel, it does not directly go to the ring queue to get data, but first determines whether there is a Groutine waiting to send data. If so, it directly takes the Groutine out of the queue, returns the data, and wakes up the Groutine. If there is no groutine waiting to send data, fetch data from the ring queue.

Input:

  • Chan object
  • A pointer to receive data
  • Whether blocking

Output: Whether the packet is received successfully

Core logic:

  1. If there is a Groutine waiting to send data, fetch a Groutine waiting to send data from Sendq and fetch the data
  2. If there is no waiting Groutine and there is data in the ring queue, fetch data from the queue
  3. If there is no waiting Groutine and there is no data in the ring queue, the groutine is blocked and the groutine is packaged as sudoGo and added to the RECEVQ wait queue

There is a waiting Groutine in sendq

Hd address

Groutine in sendq without waiting, the queue is not empty

Hd address

Groutine without waiting in Sendq, queue is empty

Hd address

Read data source code

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

  / / lock
  lock(&c.lock)
  If there is a groutine waiting to send data, fetch data directly from the coroutine sending data
  ifsg := c.sendq.dequeue(); sg ! =nil {
    recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
    return true.true
  }

  // chan if there is data in the ring queue
  if c.qcount > 0 {
    // Fetch data from the index that receives data
    // equivalent to c.bouf [c.recvx]
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    // Copy the data to the receiving coroutine
    ifep ! =nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    // The index of the received data moves forward
    c.recvx++
    // Ring queue, if at the end, then start from 0
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    // Index move position of send data
    c.qcount--
    unlock(&c.lock)
    return true.true
  }

  // Synchronization is non-blocking and the coroutine returns directly
  if! block { unlock(&c.lock)return false.false
  }

  // Synchronously blocked
  // If the code goes there, no data is available, blocks the coroutine, and joins the channel's receive queue
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  ift0 ! =0 {
    mysg.releasetime = - 1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.isSelect = false
  mysg.c = c
  gp.param = nil

  // Add to receive queue
  c.recvq.enqueue(mysg)
  / / scheduling
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

  // someone woke us up
  ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
  }
  gp.waiting = nil
  gp.activeStackChans = false
  if mysg.releasetime > 0 {
    blockevent(mysg.releasetime-t0, 2)
  }
  closed := gp.param == nil
  gp.param = nil
  mysg.c = nil

  // G is awakened and continues execution from here
  releaseSudog(mysg)
  return true, !closed
}
Copy the code

Shut down the channel

Input: channel

Output: no

Core process:

  • Set off state
  • Wake up all coroutines waiting to read chanel
  • All coroutines waiting to write to a channel throw an exception
func closechan(c *hchan) {
  // Channel is empty, throwing an exception
  if c == nil {
    panic(plainError("close of nil channel"))}/ / lock
  lock(&c.lock)

  // If the channel is already closed, throw an exception
  ifc.closed ! =0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))}// Set the value of the closed state
  c.closed = 1

  // Declare a list of g's to put all groutine's in
  // The purpose is to release the lock as soon as possible, because there may be data in the queue that needs to be processed and the lock may be used
  var glist gList

  // release all readers
  // Wake up all coroutines waiting to read Chanel data
  for {
    sg := c.recvq.dequeue()
    // Wait for the queue to complete and exit
    if sg == nil {
      break
    }
    ifsg.elem ! =nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    ifsg.releasetime ! =0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    // Join the temporary queue
    glist.push(gp)
  }

  // release all writers (they will panic)
  // Handle all coroutines to send data, throw exceptions
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    ifsg.releasetime ! =0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    // Join the temporary queue
    glist.push(gp)
  }
  unlock(&c.lock)

  // Ready all Gs now that we've dropped the channel lock.
  // Process all groutine in temporary queues
  for! glist.empty() { gp := glist.pop() gp.schedlink =0

    // Put it into the scheduling queue to be scheduled
    goready(gp, 3)}}Copy the code

conclusion

When I used Channel for the first time, I felt very complicated and magical. With this curiosity, I studied the underlying source code implementation. After reading it, I found that it was not that complicated and the underlying implementation logic was very clear. This paper sorted out the underlying logic by means of both pictures and texts, including creating channels, sending data, receiving data and so on. Of course, it also involves scheduling and other knowledge, and then specially collates an article to be analyzed.

reference

  • www.cyhone.com/articles/an…
  • Github.com/talkgo/nigh…
  • talkgo.org/t/topic/75