In THE Go language, goroutines pass data through channels. In THE CSP (Communication Sequence Model), Goroutines are entities and channels are the medium through which information is transmitted.
Both receiving and sending data in a Channel follow the first-in, first-out principle.
Optimistic locking is a concurrency control idea, not a real lock.
In a sense, a Channel is a locked queue for synchronization and communication.
The type of the Channel
- The synchronization Channel
- The sender sends data directly to the receiver without the need for buffers.
- Asynchronous Channel
- Traditional producer-consumer model based on circular buffer.
- Chan struct{} asynchronous Channel
- The struct{} type does not take up memory space and does not need to implement buffer and direct send semantics.
Channel data structure
Buf unsafe.Pointer // Elemsize uint16 // Parameter Description Value CHANNEL Size that can be sent or received CLOSED uint32 ElemTYPE *_type // Type that can be sent or received by channel sendX Uint // Position of the sending operation RecVx uint // Position of the receiving operation Recvq Waitq // Store a list of goroutines whose current channel is blocked by insufficient buffers. Sendq Waitq Lock mutex}Copy the code
Among them, qcount, dataqsiz, buf, Sendx, recv constitute the bottom circular queue.
Create CHANNEL
Make (chan int, 10). The Go compiler converts the expression to a node of type OMAKE. During the type checking phase, the OMAKE node is converted to a node of type OMAKECHAN. Eventually OMAKECHAN will be converted to a function that calls Makechan or Makechan64 before the SSA intermediate code generation phase, which handles buffer sizes greater than 2^32.
Makechan’s implementation logic is to initialize the hchan structure and buffer based on the type of the transceiver element in the Channel and the size of the buffer:
- If there is no buffer in the current Channel, then runtime.hchan is allocated only one memory space.
- If the type stored in the current Channel is not a pointer type, contiguous memory space is allocated for the current Channel and the underlying array.
- By default, memory is allocated for runtime.hchan and buffer separately;
Finally, the corresponding fields are updated.
SEND DATA
Data transmission in the channel is represented by ch < -i, that is, data in I is sent to the channel. Eventually the <- symbol is converted to the OSEND node, which is eventually converted in the code to the Chansend1 function, which calls the Chansend function and passes in the channel and the data to be sent. That is, all the logic for sending data to the channel is implemented in the chansend function.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
Copy the code
Where a block sets whether the current send operation is blocked. To prevent race conditions, a lock is required before the logic can be executed.
The logic of Chansend is as follows:
-
If there are waiting recipients, the runtime.send command is used to send data to the blocking recipients.
- Send also calls sendDirect and goReady
-
When there is free space in the buffer, the sent data is written to the buffer of the Channel.
- The next available address in the buffer is checked, the data is copied to that location, and the index and counter are incremented
-
Wait for the other Goroutine to receive data from the Channel when there is no buffer or the buffer is full.
- It’s a little bit more complicated, and the logic is as follows
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool{...if! block { unlock(&c.lock)return false } gp := getg() mysg := acquireSudog() mysg.elem = ep mysg.g = gp mysg.c = c gp.waiting = mysg c.sendq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) gp.waiting = nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true } Copy the code
To sum up:
- If the current Channel
recvq
If there is a blocked Goroutine, data will be sent directly to the current Goroutine and set to the next running Goroutine. - If a Channel has a buffer and it has free capacity, we store data directly into the current buffer
sendx
In the position of; - If the above two conditions are not met, one is created
runtime.sudog
Structure and add it to Channel’ssendq
In the queue, the current Goroutine is also blocked waiting for other coroutines to receive data from the Channel;
RECIVE Data
There are two ways to receive data: I < -ch and I, ok < -ch. Both methods will eventually be processed by the compiler as nodes of type ORECV, which will be converted to type OAS2RECV during the type checking phase. These two methods are eventually converted into calls to chanrecv1 and chanrecv2, but essentially both call the chanrecv function.
Chanrecv is basically implemented as follows
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if ! block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } lock(&c.lock) if c.closed ! = 0 && c.qcount == 0 { unlock(&c.lock) if ep ! = nil { typedmemclr(c.elemtype, ep) } return true, false }Copy the code
To sum it up:
- If the Channel is empty, runtime.gopark is called to suspend the current Goroutine.
- If the Channel is closed and there is no data in the buffer, runtime.chanrecv returns directly;
- If the Channel
sendq
If there is a pending Goroutine in the queue, therecvx
The index data is copied to the memory space where the receive variable is located and thesendq
The Goroutine data in the queue is copied to the buffer; - If a Channel’s buffer contains data, it reads it directly
recvx
Index corresponding data; - By default, the current Goroutine will be suspended
runtime.sudog
Structural joinrecvq
Queue and fall asleep waiting for the scheduler to wake up;
Ref
- Draveness. Me/golang/docs…