Introduction to the
Channel is a feature of Go language. There are many problems worth discussing based on channel, such as
- Why is a channel concurrency safe?
- What is the difference between synchronous and asynchronous channels?
- Why does a channel block a coroutine?
- How can coroutines that block using channels be unblocked?
To understand the essence, need to look into the source code, after all, there is no secret under the source code.
The principle of
create
There are three kinds of channels in theory, with buffered \ without buffered \nil, written as follows:
// buffered
ch := make(chan Task, 3)
// unbuffered
ch := make(chan int)
// nil
var ch chan int
Copy the code
Builtin /builtin.go Func make(t Type, size… IntegerType) Type. Real implementation can refer to the go built-in functions make simple terms in the CMD/compile/internal/gc/typecheck typecheck1 go in function
// The result of typecheck1 MUST be assigned back to n, e.g.
// n.Left = typecheck1(n.Left, top)
func typecheck1(n *Node, top int) (res *Node) {
if enableTrace && trace {
defer tracePrint("typecheck1", n)(&res)
}
switch n.Op {
case OMAKE:
ok |= ctxExpr
args := n.List.Slice()
if len(args) == 0 {
yyerror("missing argument to make")
n.Type = nil
return n
}
n.List.Set(nil)
l := args[0]
l = typecheck(l, Etype)
t := l.Type
if t == nil {
n.Type = nil
return n
}
i := 1
switch t.Etype {
default:
yyerror("cannot make type %v", t)
n.Type = nil
return n
case TCHAN:
l = nil
if i < len(args) {
l = args[i]
i++
l = typecheck(l, ctxExpr)
l = defaultlit(l, types.Types[TINT])
if l.Type == nil {
n.Type = nil
return n
}
if! checkmake(t,"buffer", l) {
n.Type = nil
return n
}
n.Left = l
} else {
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN // The corresponding function position
}
if i < len(args) {
yyerror("too many arguments to make(%v)", t)
n.Op = OMAKE
n.Type = nil
return n
}
n.Type = t
if(top&ctxStmt ! =0) && top&(ctxCallee|ctxExpr|Etype) == 0 && ok&ctxStmt == 0 {
if! n.Diag() { yyerror("%v evaluated but not used", n)
n.SetDiag(true)
}
n.Type = nil
return n
}
return n
}
}
Copy the code
The actual implementation location is Runtime /chan.go
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")}ifhchanSize%maxAlign ! =0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil.true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
caseelem.kind&kindNoPointers ! =0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")}return c
}
Copy the code
As you can see from this function, the data structure of channel is Hchan
structure
Let’s look at the data structure of a channel. Based on the data structure, we can infer the implementation.
runtime/chan.go
type hchan struct {
// The total amount of data in the channel queue
qcount uint // total data in the queue
// The capacity of the loop queue is 0 if the channel is not buffered
dataqsiz uint // size of the circular queue
// Buffer queue, array type.
buf unsafe.Pointer // points to an array of dataqsiz elements
// The element occupies the size of bytes
elemsize uint16
// The current queue closure flag bit. Non-zero indicates that the queue is closed
closed uint32
// Type of the element in the queue
elemtype *_type // element type
// Queue send index
sendx uint // send index
// Queue index
recvx uint // receive index
// Queue G waiting for channel.
recvq waitq // list of recv waiters
// The G queue that sends data to the channel.
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
/ / global lock
lock mutex
}
Copy the code
With the hchan data structure and makechan function, there are several data structures worth illustrating:
- Dataqsiz indicates the length of the channel, or 0 if it is a non-buffered queue. Circular queuing is implemented with DataqSIz.
- Buf houses real data
- Sendx and Recvx refer to the places in the ring queue where data enters and exits a channel
- Sendq holds the Goroutine queue that sends data to a channel
- Recvq holds the Goroutine queue waiting to fetch channel data
- Lock is a global lock
Anwser
With the traced code, we can answer our first few questions.
Why is a channel concurrency safe?
Before performing any operation, the global lock is obtained. The operation can be performed only when the global lock is successfully obtained, ensuring concurrency security.
What is the difference between synchronous and asynchronous channels?
The underlying data structure and operation code are the same, but the dataqsiz values are different, one is 0, the other is positive.
Why does a channel block a coroutine?
The coroutine blocks when the channel is full and continues to write to it, or when there is no data in the channel but the coroutine gets data from the channel.
The implementation principle is strongly related to Golang’s GMP model of concurrent scheduling.
Write a full channel process
- The current Goroutine (G1) creates a reference to itself (sudog) and places it in hchan’s Sendq queue
- The current goroutine (G1) calls gopark to make the current coroutine waiting.
- Disconnect the binding between M and G1;
- Scheduler schedules another ready Goroutine to bind to M, which then runs another G.
The process of reading an empty channel
- Currently goroutine (G2) creates a reference to itself (sudog)
- The SUdog representing G2 is put into the RECVQ wait queue
- G2 will call the gopark function to enter the wait state, relinquish the OS thread, and G2 will enter the blocking state
How can coroutines that block using channels be unblocked?
For channels that are already full, when coroutine G2 performs read operations, G1 will be unblocked, and the flow is as follows
- G2 call
t:=<-ch
Get an element A; - Extract an element from hchan’s BUF;
- Pop a sudog from the sendq wait queue;
- Copy the data that G1 is writing to A in buF, and update the SENDx and recvx indexes of BUF;
- G2 calls goReady (G1) to set G1 to Runable, indicating that G1 can be restored.
For an empty channel, when a write operation is performed by coroutine G1, G2 will be unblocked, and the flow is as follows
- Sends the message to be written to the receiving Goroutine G2;
- G1 calls goReady (G2) to set G2 to ready state for scheduling.
implementation
Let’s look at the concrete implementation of Chan
Read the data
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")}if c == nil {
if! block {return
}
gopark(nil.nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
ifc.closed ! =0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
ifep ! =nil {
typedmemclr(c.elemtype, ep)
}
return true.false
}
ifsg := c.sendq.dequeue(); sg ! =nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true.true
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
ifep ! =nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true.true
}
if! block { unlock(&c.lock)return false.false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
Copy the code
The process for receiving data from a channel is as follows:
-
CASE1: scenario where the leading channel is nil:
- If the block is non-blocking, return;
- If the block is blocked, gopark() is called to block the current Goroutine and an exception is thrown.
-
In the pre-scenario, if the block is non-blocking and the channel is non-buffered and the sender wait queue is empty or if the channel is buffered but the number of elements in the queue is zero and the channel is not closed, return directly.
-
Call lock(&c.lock) to lock a channel’s global lock;
-
CASE2: The channel has been closed and there is no data in the channel buffer.
-
Unsafe.Pointer, unlockf func(), skip int); unlockf func(), skip int);
Function handling:
- A channel is a non-buffered channel that calls the recvDirect function directly from the Sender recv element to the EP object, so it only needs to be copied once;
- If the sender queue is not empty, the buffered channel’s buffer queue must be full:
- 1. Copy the rival elements of the channel buffer queue to the receiver(i.e. Ep);
- 2. Copy the sender to the element just ejected from the channel buffer so that the buffer queue doesn’t have to move the data.
- Release channel global lock;
- Call goReady to mark the ready state of the goroutine.
-
CASE4: Sender queue is empty, buffer queue is not empty, directly fetch queue element, move the header index;
-
CASE5: Sender queue is empty, buffer queue has no elements and does not block coroutine, return (false,false);
-
CASE6: Sender queue is empty and channel’s cache queue is empty, add the goroutine to the RECV queue and block.
Write data
/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if! block {return false
}
gopark(nil.nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")}if debugChan {
print("chansend: chan=", c, "\n")}if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if! block && c.closed ==0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
ifc.closed ! =0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))}ifsg := c.recvq.dequeue(); sg ! =nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if! block { unlock(&c.lock)return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! =0 {
mysg.releasetime = - 1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
Copy the code
The main process of writing data to a channel is as follows:
- CASE1: When a channel is empty or uninitialized, sending data to it will block permanently if block means blocking; If a block means non-blocking, it returns directly;
- CASE2: In the pre-scenario, if the block is non-blocking and the channel is not closed (the closed channel cannot write data) and (the channel is not buffered and the receiver wait queue is empty) or (the channel is buffered but the queue is full), return directly.
- call
lock(&c.lock)
A global lock on a channel - CASE3: Cannot send data to a closed channel, causing panic.
- CASE4: If the recV queue on a channel is not empty, the channel’s cache queue is skipped and the message is sent directly to the receiving Goroutine:
- Call the sendDirect method to send the message to be written to the receiving Goroutine;
- Release channel global lock;
- The goReady function is called to set the goroutine that receives the message to a ready state, waiting for dispatch.
- CASE5: If the cache queue is not full, the message is copied to the cache queue and the global lock is released.
- CASE6: If the cache queue is full and the recV of the message receiving queue is empty, the current Goroutine is added to the SEND queue.
- Get the sudog of the current Goroutine and enter the send queue of the channel.
- Put the current Goroutine to sleep
Shut down the channel
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
ifc.closed ! =0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))}if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
ifsg.elem ! =nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
ifsg.releasetime ! =0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for! glist.empty() { gp := glist.pop() gp.schedlink =0
goready(gp, 3)}}Copy the code
The main shutdown process is as follows:
- Get global lock;
- Set the close flag bit of the channel data structure chan.
- Get the read Goroutine above the current channel and link it into a linked list.
- Get the write Goroutine above the current channel and concatenate it to the end of the previous read list;
- Release global lock;
- Wake up all read and write Goroutines.
conclusion
It is good to know the specific implementation, although it will not bring changes in the use, but after understanding the connotation, it can be more flexible to use the channel, it can be easier to trace the problem, but also to learn the design ideas of masters.
data
- Analysis of golang-channel principle
- Golang for nil channel close channel artifact features you didn’t know about
- Go language make and new keyword difference and realization principle
- Go low-level reference implementation
- Illustrates the underlying principles of Golang’s channel
- Go has the built-in function make
- Golang GMP model for concurrent scheduling
The last
If you like my article, you can follow my public account (Programmer Malatang)
My personal blog is shidawuhen.github. IO /
Review of previous articles:
technology
- Implementation principle of Go channel
- Implementation principle of Go timer
- HTTPS Connection Process
- Current limiting 2
- Seconds kill system
- Distributed systems and consistency protocols
- Service framework and registry for microservices
- Beego framework use
- Discussion on Micro-service
- TCP Performance Optimization
- Current limiting implementation 1
- Redis implements distributed locking
- Golang source BUG tracking
- The implementation principle of atomicity, consistency and persistence of transactions
- CDN request process details
- Common Cache tips
- How to effectively connect with third-party payment
- Gin framework concise version
- InnoDB locks and transactions
- Algorithm is summarized
Reading notes
- Agile revolution
- How to exercise your memory
- Simple Logic – After reading
- Hot Wind – After reading
- Analects of Confucius – After reading
- Sun Tzu’s Art of War – Reflections from reading
thinking
- Project process management
- Some thoughts on project management
- Some thoughts on product manager
- Thinking about programmer career development
- Thinking about code review
- Markdown editor recommends – Typora