In THE Go language, goroutines pass data through channels. In THE CSP (Communication Sequence Model), Goroutines are entities and channels are the medium through which information is transmitted.

Both receiving and sending data in a Channel follow the first-in, first-out principle.

Optimistic locking is a concurrency control idea, not a real lock.

In a sense, a Channel is a locked queue for synchronization and communication.

The type of the Channel

  • The synchronization Channel
    • The sender sends data directly to the receiver without the need for buffers.
  • Asynchronous Channel
    • Traditional producer-consumer model based on circular buffer.
  • Chan struct{} asynchronous Channel
    • The struct{} type does not take up memory space and does not need to implement buffer and direct send semantics.

Channel data structure

Buf unsafe.Pointer // Elemsize uint16 // Parameter Description Value CHANNEL Size that can be sent or received CLOSED uint32 ElemTYPE *_type // Type that can be sent or received by channel sendX Uint // Position of the sending operation RecVx uint // Position of the receiving operation Recvq Waitq // Store a list of goroutines whose current channel is blocked by insufficient buffers. Sendq Waitq Lock mutex}Copy the code

Among them, qcount, dataqsiz, buf, Sendx, recv constitute the bottom circular queue.

Create CHANNEL

Make (chan int, 10). The Go compiler converts the expression to a node of type OMAKE. During the type checking phase, the OMAKE node is converted to a node of type OMAKECHAN. Eventually OMAKECHAN will be converted to a function that calls Makechan or Makechan64 before the SSA intermediate code generation phase, which handles buffer sizes greater than 2^32.

Makechan’s implementation logic is to initialize the hchan structure and buffer based on the type of the transceiver element in the Channel and the size of the buffer:

  • If there is no buffer in the current Channel, then runtime.hchan is allocated only one memory space.
  • If the type stored in the current Channel is not a pointer type, contiguous memory space is allocated for the current Channel and the underlying array.
  • By default, memory is allocated for runtime.hchan and buffer separately;

Finally, the corresponding fields are updated.

SEND DATA

Data transmission in the channel is represented by ch < -i, that is, data in I is sent to the channel. Eventually the <- symbol is converted to the OSEND node, which is eventually converted in the code to the Chansend1 function, which calls the Chansend function and passes in the channel and the data to be sent. That is, all the logic for sending data to the channel is implemented in the chansend function.

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
Copy the code

Where a block sets whether the current send operation is blocked. To prevent race conditions, a lock is required before the logic can be executed.

The logic of Chansend is as follows:

  • If there are waiting recipients, the runtime.send command is used to send data to the blocking recipients.

    • Send also calls sendDirect and goReady
  • When there is free space in the buffer, the sent data is written to the buffer of the Channel.

    • The next available address in the buffer is checked, the data is copied to that location, and the index and counter are incremented
  • Wait for the other Goroutine to receive data from the Channel when there is no buffer or the buffer is full.

    • It’s a little bit more complicated, and the logic is as follows
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool{...if! block { unlock(&c.lock)return false
    	}
    
    	gp := getg()
    	mysg := acquireSudog()
    	mysg.elem = ep
    	mysg.g = gp
    	mysg.c = c
    	gp.waiting = mysg
    	c.sendq.enqueue(mysg)
    	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
    
    	gp.waiting = nil
    	gp.param = nil
    	mysg.c = nil
    	releaseSudog(mysg)
    	return true
    }
    Copy the code

To sum up:

  • If the current ChannelrecvqIf there is a blocked Goroutine, data will be sent directly to the current Goroutine and set to the next running Goroutine.
  • If a Channel has a buffer and it has free capacity, we store data directly into the current buffersendxIn the position of;
  • If the above two conditions are not met, one is createdruntime.sudogStructure and add it to Channel’ssendqIn the queue, the current Goroutine is also blocked waiting for other coroutines to receive data from the Channel;

RECIVE Data

There are two ways to receive data: I < -ch and I, ok < -ch. Both methods will eventually be processed by the compiler as nodes of type ORECV, which will be converted to type OAS2RECV during the type checking phase. These two methods are eventually converted into calls to chanrecv1 and chanrecv2, but essentially both call the chanrecv function.

Chanrecv is basically implemented as follows

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if ! block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed ! = 0 && c.qcount == 0 { unlock(&c.lock) if ep ! = nil { typedmemclr(c.elemtype, ep) } return true, false }Copy the code

To sum it up:

  • If the Channel is empty, runtime.gopark is called to suspend the current Goroutine.
  • If the Channel is closed and there is no data in the buffer, runtime.chanrecv returns directly;
  • If the ChannelsendqIf there is a pending Goroutine in the queue, therecvxThe index data is copied to the memory space where the receive variable is located and thesendqThe Goroutine data in the queue is copied to the buffer;
  • If a Channel’s buffer contains data, it reads it directlyrecvxIndex corresponding data;
  • By default, the current Goroutine will be suspendedruntime.sudogStructural joinrecvqQueue and fall asleep waiting for the scheduler to wake up;

Ref

  • Draveness. Me/golang/docs…