Introduction to the

Channel is a feature of Go language. There are many problems worth discussing based on channel, such as

  1. Why is a channel concurrency safe?
  2. What is the difference between synchronous and asynchronous channels?
  3. Why does a channel block a coroutine?
  4. How can coroutines that block using channels be unblocked?

To understand the essence, need to look into the source code, after all, there is no secret under the source code.

The principle of

create

There are three kinds of channels in theory, with buffered \ without buffered \nil, written as follows:

// buffered
ch := make(chan Task, 3)
// unbuffered
ch := make(chan int)
// nil
var ch chan int
Copy the code

Builtin /builtin.go Func make(t Type, size… IntegerType) Type. Real implementation can refer to the go built-in functions make simple terms in the CMD/compile/internal/gc/typecheck typecheck1 go in function

// The result of typecheck1 MUST be assigned back to n, e.g.
// n.Left = typecheck1(n.Left, top)
func typecheck1(n *Node, top int) (res *Node) {
	if enableTrace && trace {
		defer tracePrint("typecheck1", n)(&res)
	}

	switch n.Op {
	case OMAKE:
		ok |= ctxExpr
		args := n.List.Slice()
		if len(args) == 0 {
			yyerror("missing argument to make")
			n.Type = nil
			return n
		}

		n.List.Set(nil)
		l := args[0]
		l = typecheck(l, Etype)
		t := l.Type
		if t == nil {
			n.Type = nil
			return n
		}

		i := 1
		switch t.Etype {
		default:
			yyerror("cannot make type %v", t)
			n.Type = nil
			return n

		case TCHAN:
			l = nil
			if i < len(args) {
				l = args[i]
				i++
				l = typecheck(l, ctxExpr)
				l = defaultlit(l, types.Types[TINT])
				if l.Type == nil {
					n.Type = nil
					return n
				}
				if! checkmake(t,"buffer", l) {
					n.Type = nil
					return n
				}
				n.Left = l
			} else {
				n.Left = nodintconst(0)
			}
			n.Op = OMAKECHAN // The corresponding function position
		}

		if i < len(args) {
			yyerror("too many arguments to make(%v)", t)
			n.Op = OMAKE
			n.Type = nil
			return n
		}

		n.Type = t

		if(top&ctxStmt ! =0) && top&(ctxCallee|ctxExpr|Etype) == 0 && ok&ctxStmt == 0 {
			if! n.Diag() { yyerror("%v evaluated but not used", n)
				n.SetDiag(true)
			}

			n.Type = nil
			return n
		}

		return n
	}
}
Copy the code

The actual implementation location is Runtime /chan.go

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")}ifhchanSize%maxAlign ! =0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil.true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   caseelem.kind&kindNoPointers ! =0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")}return c
}
Copy the code

As you can see from this function, the data structure of channel is Hchan

structure

Let’s look at the data structure of a channel. Based on the data structure, we can infer the implementation.

runtime/chan.go

type hchan struct {
	// The total amount of data in the channel queue
	qcount   uint           // total data in the queue
	// The capacity of the loop queue is 0 if the channel is not buffered
	dataqsiz uint           // size of the circular queue
	// Buffer queue, array type.
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	// The element occupies the size of bytes
	elemsize uint16
	// The current queue closure flag bit. Non-zero indicates that the queue is closed
	closed   uint32
	// Type of the element in the queue
	elemtype *_type // element type
	// Queue send index
	sendx    uint   // send index
	// Queue index
	recvx    uint   // receive index
	// Queue G waiting for channel.
	recvq    waitq  // list of recv waiters
	// The G queue that sends data to the channel.
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	/ / global lock
	lock mutex
}
Copy the code

With the hchan data structure and makechan function, there are several data structures worth illustrating:

  1. Dataqsiz indicates the length of the channel, or 0 if it is a non-buffered queue. Circular queuing is implemented with DataqSIz.
  2. Buf houses real data
  3. Sendx and Recvx refer to the places in the ring queue where data enters and exits a channel
  4. Sendq holds the Goroutine queue that sends data to a channel
  5. Recvq holds the Goroutine queue waiting to fetch channel data
  6. Lock is a global lock

Anwser

With the traced code, we can answer our first few questions.

Why is a channel concurrency safe?

Before performing any operation, the global lock is obtained. The operation can be performed only when the global lock is successfully obtained, ensuring concurrency security.

What is the difference between synchronous and asynchronous channels?

The underlying data structure and operation code are the same, but the dataqsiz values are different, one is 0, the other is positive.

Why does a channel block a coroutine?

The coroutine blocks when the channel is full and continues to write to it, or when there is no data in the channel but the coroutine gets data from the channel.

The implementation principle is strongly related to Golang’s GMP model of concurrent scheduling.

Write a full channel process

  1. The current Goroutine (G1) creates a reference to itself (sudog) and places it in hchan’s Sendq queue
  2. The current goroutine (G1) calls gopark to make the current coroutine waiting.
  3. Disconnect the binding between M and G1;
  4. Scheduler schedules another ready Goroutine to bind to M, which then runs another G.

The process of reading an empty channel

  1. Currently goroutine (G2) creates a reference to itself (sudog)
  2. The SUdog representing G2 is put into the RECVQ wait queue
  3. G2 will call the gopark function to enter the wait state, relinquish the OS thread, and G2 will enter the blocking state

How can coroutines that block using channels be unblocked?

For channels that are already full, when coroutine G2 performs read operations, G1 will be unblocked, and the flow is as follows

  1. G2 callt:=<-chGet an element A;
  2. Extract an element from hchan’s BUF;
  3. Pop a sudog from the sendq wait queue;
  4. Copy the data that G1 is writing to A in buF, and update the SENDx and recvx indexes of BUF;
  5. G2 calls goReady (G1) to set G1 to Runable, indicating that G1 can be restored.

For an empty channel, when a write operation is performed by coroutine G1, G2 will be unblocked, and the flow is as follows

  1. Sends the message to be written to the receiving Goroutine G2;
  2. G1 calls goReady (G2) to set G2 to ready state for scheduling.

implementation

Let’s look at the concrete implementation of Chan

Read the data

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // raceenabled: don't need to check ep, as it is always on the stack
   // or is new memory allocated by reflect.

   if debugChan {
      print("chanrecv: chan=", c, "\n")}if c == nil {
      if! block {return
      }
      gopark(nil.nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not ready for receiving, we observe that the
   // channel is not closed. Each of these observations is a single word-sized read
   // (first c.sendq.first or c.qcount, and second c.closed).
   // Because a channel cannot be reopened, the later observation of the channel
   // being not closed implies that it was also not closed at the moment of the
   // first observation. We behave as if we observed the channel at that moment
   // and report that the receive cannot proceed.
   //
   // The order of operations is important here: reversing the operations can lead to
   // incorrect behavior when racing with a close.
   if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||
      c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
      atomic.Load(&c.closed) == 0 {
      return
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   lock(&c.lock)

   ifc.closed ! =0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(c.raceaddr())
      }
      unlock(&c.lock)
      ifep ! =nil {
         typedmemclr(c.elemtype, ep)
      }
      return true.false
   }

   ifsg := c.sendq.dequeue(); sg ! =nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
      return true.true
   }

   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      ifep ! =nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true.true
   }

   if! block { unlock(&c.lock)return false.false
   }

   // no sender available: block on this channel.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   ift0 ! =0 {
      mysg.releasetime = - 1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)
   goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

   // someone woke us up
   ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}
Copy the code

The process for receiving data from a channel is as follows:

  • CASE1: scenario where the leading channel is nil:

    • If the block is non-blocking, return;
    • If the block is blocked, gopark() is called to block the current Goroutine and an exception is thrown.
  • In the pre-scenario, if the block is non-blocking and the channel is non-buffered and the sender wait queue is empty or if the channel is buffered but the number of elements in the queue is zero and the channel is not closed, return directly.

  • Call lock(&c.lock) to lock a channel’s global lock;

  • CASE2: The channel has been closed and there is no data in the channel buffer.

  • Unsafe.Pointer, unlockf func(), skip int); unlockf func(), skip int);

    Function handling:

    • A channel is a non-buffered channel that calls the recvDirect function directly from the Sender recv element to the EP object, so it only needs to be copied once;
    • If the sender queue is not empty, the buffered channel’s buffer queue must be full:
      • 1. Copy the rival elements of the channel buffer queue to the receiver(i.e. Ep);
      • 2. Copy the sender to the element just ejected from the channel buffer so that the buffer queue doesn’t have to move the data.
    • Release channel global lock;
    • Call goReady to mark the ready state of the goroutine.
  • CASE4: Sender queue is empty, buffer queue is not empty, directly fetch queue element, move the header index;

  • CASE5: Sender queue is empty, buffer queue has no elements and does not block coroutine, return (false,false);

  • CASE6: Sender queue is empty and channel’s cache queue is empty, add the goroutine to the RECV queue and block.

Write data

/* * generic single channel send/recv * If block is not nil, * then the protocol will not * sleep but return if it could * not complete. * * sleep can wake up with g.param == nil * when a channel involved in the sleep has * been closed. it is easiest to loop and re-run * the operation; we'll see that it's now closed. */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil {
      if! block {return false
      }
      gopark(nil.nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")}if debugChan {
      print("chansend: chan=", c, "\n")}if raceenabled {
      racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not closed, we observe that the channel is
   // not ready for sending. Each of these observations is a single word-sized read
   // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
   // Because a closed channel cannot transition from 'ready for sending' to
   // 'not ready for sending', even if the channel is closed between the two observations,
   // they imply a moment between the two when the channel was both not yet closed
   // and not ready for sending. We behave as if we observed the channel at that moment,
   // and report that the send cannot proceed.
   //
   // It is okay if the reads are reordered here: if we observe that the channel is not
   // ready for sending and then observe that it is not closed, that implies that the
   // channel wasn't closed during the first observation.
   if! block && c.closed ==0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
      (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
      return false
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   lock(&c.lock)

   ifc.closed ! =0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))}ifsg := c.recvq.dequeue(); sg ! =nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         raceacquire(qp)
         racerelease(qp)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   if! block { unlock(&c.lock)return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   ift0 ! =0 {
      mysg.releasetime = - 1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   return true
}
Copy the code

The main process of writing data to a channel is as follows:

  • CASE1: When a channel is empty or uninitialized, sending data to it will block permanently if block means blocking; If a block means non-blocking, it returns directly;
  • CASE2: In the pre-scenario, if the block is non-blocking and the channel is not closed (the closed channel cannot write data) and (the channel is not buffered and the receiver wait queue is empty) or (the channel is buffered but the queue is full), return directly.
  • calllock(&c.lock)A global lock on a channel
  • CASE3: Cannot send data to a closed channel, causing panic.
  • CASE4: If the recV queue on a channel is not empty, the channel’s cache queue is skipped and the message is sent directly to the receiving Goroutine:
    • Call the sendDirect method to send the message to be written to the receiving Goroutine;
    • Release channel global lock;
    • The goReady function is called to set the goroutine that receives the message to a ready state, waiting for dispatch.
  • CASE5: If the cache queue is not full, the message is copied to the cache queue and the global lock is released.
  • CASE6: If the cache queue is full and the recV of the message receiving queue is empty, the current Goroutine is added to the SEND queue.
    • Get the sudog of the current Goroutine and enter the send queue of the channel.
    • Put the current Goroutine to sleep

Shut down the channel

func closechan(c *hchan) {
   if c == nil {
      panic(plainError("close of nil channel"))
   }

   lock(&c.lock)
   ifc.closed ! =0 {
      unlock(&c.lock)
      panic(plainError("close of closed channel"))}if raceenabled {
      callerpc := getcallerpc()
      racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
      racerelease(c.raceaddr())
   }

   c.closed = 1

   var glist gList

   // release all readers
   for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      ifsg.elem ! =nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      ifsg.releasetime ! =0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }

   // release all writers (they will panic)
   for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
      }
      sg.elem = nil
      ifsg.releasetime ! =0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }
   unlock(&c.lock)

   // Ready all Gs now that we've dropped the channel lock.
   for! glist.empty() { gp := glist.pop() gp.schedlink =0
      goready(gp, 3)}}Copy the code

The main shutdown process is as follows:

  • Get global lock;
  • Set the close flag bit of the channel data structure chan.
  • Get the read Goroutine above the current channel and link it into a linked list.
  • Get the write Goroutine above the current channel and concatenate it to the end of the previous read list;
  • Release global lock;
  • Wake up all read and write Goroutines.

conclusion

It is good to know the specific implementation, although it will not bring changes in the use, but after understanding the connotation, it can be more flexible to use the channel, it can be easier to trace the problem, but also to learn the design ideas of masters.

data

  1. Analysis of golang-channel principle
  2. Golang for nil channel close channel artifact features you didn’t know about
  3. Go language make and new keyword difference and realization principle
  4. Go low-level reference implementation
  5. Illustrates the underlying principles of Golang’s channel
  6. Go has the built-in function make
  7. Golang GMP model for concurrent scheduling

The last

If you like my article, you can follow my public account (Programmer Malatang)

My personal blog is shidawuhen.github. IO /

Review of previous articles:

technology

  1. Implementation principle of Go channel
  2. Implementation principle of Go timer
  3. HTTPS Connection Process
  4. Current limiting 2
  5. Seconds kill system
  6. Distributed systems and consistency protocols
  7. Service framework and registry for microservices
  8. Beego framework use
  9. Discussion on Micro-service
  10. TCP Performance Optimization
  11. Current limiting implementation 1
  12. Redis implements distributed locking
  13. Golang source BUG tracking
  14. The implementation principle of atomicity, consistency and persistence of transactions
  15. CDN request process details
  16. Common Cache tips
  17. How to effectively connect with third-party payment
  18. Gin framework concise version
  19. InnoDB locks and transactions
  20. Algorithm is summarized

Reading notes

  1. Agile revolution
  2. How to exercise your memory
  3. Simple Logic – After reading
  4. Hot Wind – After reading
  5. Analects of Confucius – After reading
  6. Sun Tzu’s Art of War – Reflections from reading

thinking

  1. Project process management
  2. Some thoughts on project management
  3. Some thoughts on product manager
  4. Thinking about programmer career development
  5. Thinking about code review
  6. Markdown editor recommends – Typora