Analyze go Channel from shallow to deep

Channel is one of the most core features in Golang, so understanding the principle of channel is very important for learning and using Golang.

A channel is a way of communicating between Goroutines, analogous to a communication channel for processes in Unix.

CSP model

Before talking about channel, it is necessary to mention the CSP model. The traditional concurrent model is mainly divided into Actor model and CSP model. The whole name of CSP model is: Communicating sequential processes, in which concurrent execution entities (processes, Threads or coroutines), and a message channel through which entities communicate by sending messages. Unlike the Actor model, the CSP model focuses on the vehicle through which the message is sent, the channel, rather than the execution entity that sends the message. 3. For a further introduction to the CSP model, if you are interested, you can read the paper: Revised: Communicating with one another: The concurrent model of Go refers to THE CSP theory, in which the executing entity corresponds to Goroutine, A message channel corresponds to a channel.

The channel is introduced

A channel provides a communication mechanism by which one Goroutine can send messages to another goroutine. A channel must also be associated with a type, that is, the type to which the channel can send data. For example, a channel that sends an int message is written as chan int.

Read and write operations on a channel

ch := make(chan int)

// write to channel
ch <- x

// read from channel
x <- ch

// another way to read
x = <- ch
Copy the code

A channel can be read or written only after it is initialized; otherwise, it will be blocked permanently.

Shut down the channel

Golang provides a built-in close function to close a channel.

ch := make(chan int)

close(ch)
Copy the code

When closing a channel, you should note the following:

  • Closing an uninitialized (nil) channel causes panic
  • Closing the same channel repeatedly causes panic
  • Sending a message to a closed channel causes panic
  • Reading a message from a closed channel does not cause panic, and the channel can read messages that have not been read. If all messages have been read, the type zero value will be read. Reading a message from a closed channel never blocks and returns an OK-idiom of false, which can be used to determine if the channel is closed
  • Closing a channel produces a broadcast mechanism, and all Goroutines that read messages from the channel receive messages
ch := make(chan int.10)
ch <- 11
ch <- 12

close(ch)

for x := range ch {
    fmt.Println(x)
}

x, ok := <- ch
fmt.Println(x, ok)


-----
output:

11
12
0 false
Copy the code

The use of the channel

Goroutine communication

Look at an example from Effective Go:

c := make(chan int)  // Allocate a channel.

// Start the sort in a goroutine; when it completes, signal on the channel.
go func(a) {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()

doSomethingForAWhile()
<-c
Copy the code

The main goroutine blocks until the Goroutine that did sort completes.

The range traversal

A channel can also use a range, and the loop will continue reading data from the channel until a Goroutine executes a close operation on the changed channel.

// consumer worker
ch := make(chan int.10)
for x := range ch{
    fmt.Println(x)
}
Copy the code

Is equivalent to

for {
    x, ok := <- ch
    if! ok {break
    }
    
    fmt.Println(x)
}
Copy the code

Use with SELECT

Select is similar to IO multiplexing and can listen to the message state of multiple channels at the same time, as shown in the following example

select {
    case <- ch1:
    ...
    case <- ch2:
    ...
    case ch3 <- 10; .default:... }Copy the code
  • Select can listen for writes or reads from multiple channels at the same time
  • When a select is executed, if only one case passes (without blocking), the case block is executed
  • If multiple cases pass, a case is randomly selected and executed
  • If all cases are blocked and the default module is defined, the default module is executed. If no default module is defined, the SELECT statement blocks until a case is awakened.
  • Using break jumps out of the SELECT block.

1. Set the timeout period

ch := make(chan struct{})

// finish task while send msg to ch
go doTask(ch)

timeout := time.After(5 * time.Second)
select {
    case <- ch:
        fmt.Println("task finished.")
    case <- timeout:
        fmt.Println("task timeout.")}Copy the code

2. quite channel

In some scenarios, some worker Goroutines need to loop through the information until the quit signal is received

msgCh := make(chan struct{})
quitCh := make(chan struct{})
for {
    select {
    case <- msgCh:
        doWork()
    case <- quitCh:
        finish()
        return
}
Copy the code

A one-way channel

A single channel is a writable or writable channel. A single channel is read-only or writable

func foo(ch chan<- int) <-chan int{... }Copy the code

Chan < -int indicates a writer-only channel, and <-chan int indicates a read-only channel. The above function specifies that foo can only write data from ch and return only a channel that can only be read. While using a normal channel is fine, this convention prevents channel abuse during method declarations, which prevents recompilation.

Channel source code analysis

The main implementation of channel is SRC/Runtime /chan.go. The following source code is based on go1.9.2. When reading the source code, in order to better understand the channel feature and help the correct and reasonable use of channel, the process of reading the code can recall the channel feature in the previous chapter.

The channel class structure

The channel class is defined as follows:

// Channel type definition
type hchan struct {
    // The number of elements in a channel, len
    qcount   uint           // total data in the queue
    
    // Channel size, cap
    dataqsiz uint           // size of the circular queue
    
    // Channel buffer, annular array implementation
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    
    // The size of a single element
    elemsize uint16
    
    // The flag bit of closed
    closed   uint32
    
    // Type of the element
    elemtype *_type // element type
    
    // Index of send and recieve, used to implement a circular array queue
    sendx    uint   // send index
    recvx    uint   // receive index
    
    // Recv goroutine wait queue
    recvq    waitq  // list of recv waiters
    
    // send goroutine wait queue
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

// A list implementation of waiting queues
type waitq struct {    
    first *sudog       
    last  *sudog       
}

// in src/runtime/runtime2.go
// Encapsulate G
type sudog struct {
    // The following fields are protected by the hchan.lock of the
    // channel this sudog is blocking on. shrinkstack depends on
    // this for sudogs involved in channel ops.

    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}
Copy the code

As you can see, the main components of a channel are: a queue of circular array implementations for storing message elements; Goroutine wait queues implemented by two linked lists for storing goroutines that block on RECV and SEND operations; A mutex for synchronization of property changes

The channel make implementation

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")}ifhchanSize%maxAlign ! =0 || elem.align > maxAlign {
        throw("makechan: bad alignment")}if size < 0 || int64(uintptr(size)) ! = size || (elem.size >0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
        panic(plainError("makechan: size out of range"))}var c *hchan
    
    ifelem.kind&kindNoPointers ! =0 || size == 0 {
        // case 1: channel has no pointer
        // case 2: size == 0, i.e. no buffer channel
        // Allocate memory in one call.
        // Hchan does not contain pointers interesting for GC in this case:
        // buf points into the same allocation, elemtype is persistent.
        // SudoG's are referenced from their owning thread so they can't be collected.
        // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
        
        // Allocate contiguous space on the heap for channels
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil.true))
        if size > 0&& elem.size ! =0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // The buffered channel is initialized
        c = new(hchan)
        // Allocate BUF memory on the heap
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")}return c
}
Copy the code

The make process is relatively simple, but it is important to note that the entire hchan is allocated into a contiguous space when the element contains no Pointers.

channel send

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}

/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    // As mentioned in the previous section, sending data to a channel when it is uninitialized or nil will block permanently
    if c == nil {
        if! block {return false
        }
        
        // Gopark will sleep the current Goroutine and wake it up with unlockf, but the unlockf passed in is nil, so the goroutine will sleep forever
        gopark(nil.nil."chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")}if debugChan {
        print("chansend: chan=", c, "\n")}if raceenabled {
        racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not closed, we observe that the channel is
    // not ready for sending. Each of these observations is a single word-sized read
    // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
    // Because a closed channel cannot transition from 'ready for sending' to
    // 'not ready for sending', even if the channel is closed between the two observations,
    // they imply a moment between the two when the channel was both not yet closed
    // and not ready for sending. We behave as if we observed the channel at that moment,
    // and report that the send cannot proceed.
    //
    // It is okay if the reads are reordered here: if we observe that the channel is not
    // ready for sending and then observe that it is not closed, that implies that the
    // channel wasn't closed during the first observation.
    if! block && c.closed ==0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // Get the synchronization lock
    lock(&c.lock)

    // As mentioned in the previous section, sending a message to a closed channel causes panic
    ifc.closed ! =0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))}// CASE1: When a Goroutine is waiting on the recV queue, the message is skipped and sent directly to the reciever Goroutine
    ifsg := c.recvq.dequeue(); sg ! =nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
        return true
    }

    // CASE2: the message is copied to the cache queue if the cache queue is not full
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

    if! block { unlock(&c.lock)return false
    }
    
    // CASE3: Cache queue is full, add goroutine to send queue
    // Initialize sudog
    // Block on the channel. Some receiver will complete our operation for us.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    ift0 ! =0 {
        mysg.releasetime = - 1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // Join the queue
    c.sendq.enqueue(mysg)
    / / sleep
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    / / wake goroutine
    // someone woke us up.
    ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
    
Copy the code

As you can see from the SEND code, some of the features mentioned in the previous section are reflected in the code,

Send has the following scenarios:

  • There is a Goroutine blocking on the Channel recV queue, the cache queue is empty, and the message is sent directly to the Reciever Goroutine, making only one copy
  • When the channel cache queue has free space, it puts the data into the queue and waits for receiving. After receiving, a total of two copies are generated
  • When the channel cache queue is full, the current Goroutine is added to the SEND queue and blocked.

channel recieve

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // raceenabled: don't need to check ep, as it is always on the stack
    // or is new memory allocated by reflect.

    if debugChan {
        print("chanrecv: chan=", c, "\n")}// Receive messages from nil channels, permanently blocked
    if c == nil {
        if! block {return
        }
        gopark(nil.nil."chan receive (nil chan)", traceEvGoStop, 2)
        throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.
    //
    // After observing that the channel is not ready for receiving, we observe that the
    // channel is not closed. Each of these observations is a single word-sized read
    // (first c.sendq.first or c.qcount, and second c.closed).
    // Because a channel cannot be reopened, the later observation of the channel
    // being not closed implies that it was also not closed at the moment of the
    // first observation. We behave as if we observed the channel at that moment
    // and report that the receive cannot proceed.
    //
    // The order of operations is important here: reversing the operations can lead to
    // incorrect behavior when racing with a close.
    if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)
    
    // CASE1: Returns a null value from a closed and empty channel recv
    ifc.closed ! =0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        unlock(&c.lock)
        ifep ! =nil {
            typedmemclr(c.elemtype, ep)
        }
        return true.false
    }

    // CASE2: send queue is not empty
    // CASE2.1: Cache queue is empty, directly from the Sender recv element
    // CASE2.2: The cache queue is not empty, the only possibility is that the cache queue is full, the element is fetched from the queue head, and the sender is woken up to write the element to the tail of the cache queue. Since it is a ring queue, when the queue is full, you only need to copy the queue head to the reciever, copy the sender element to that location, and move the queue header and tail index, without moving the queue element
    ifsg := c.sendq.dequeue(); sg ! =nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
        return true.true
    }

    // CASE3: Cache queue is not empty, fetch elements directly from the queue, move the header index
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        ifep ! =nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true.true
    }
    
    if! block { unlock(&c.lock)return false.false
    }
    
    // CASE4: The cache queue is empty, the goroutine is added to the recV queue, and blocked
    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    ift0 ! =0 {
        mysg.releasetime = - 1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // someone woke us up
    ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}
Copy the code

channel close

func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    
    // Repeat close to produce panic
    ifc.closed ! =0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))}if raceenabled {
        callerpc := getcallerpc(unsafe.Pointer(&c))
        racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
        racerelease(unsafe.Pointer(c))
    }

    c.closed = 1

    var glist *g

    // Wake up all reciever
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        ifsg.elem ! =nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        ifsg.releasetime ! =0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }

    // Wake up all sender and panic
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        ifsg.releasetime ! =0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        gp.schedlink.set(glist)
        glist = gp
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    forglist ! =nil {
        gp := glist
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        goready(gp, 3)}}Copy the code

Author: not wise fish link: www.jianshu.com/p/24ede9e90… The copyright of the book belongs to the author. Commercial reprint please contact the author for authorization, non-commercial reprint please indicate the source.