Everybody is good! “Deep decryption of Go language” series haven’t seen for a long time, we talk about channel today, I wish you a happy reading!

Concurrency model

Concurrency and parallelism

We all know the famous Moore’s Law. In 1965 Gordon Moore, then of Fairchild, published a paper predicting that the number of transistors and resistors on semiconductor chips would double every year for the next decade; In 1975, Moore published another paper, changing “every year” to “every two years.” That forecast was about right around 2012.

But Moore’s Law will eventually come to an end as transistor circuits approach their performance limits. Increasing the number of transistors to improve the performance of the computer is no longer effective. As a result, people began to change their thinking and use other ways to improve the performance of computers, which is the reason for the emergence of multi-core computers.

This seemed fine, but people ran into another Law limitation, Amdahl’s Law, which proposed a model to measure the efficiency of programs running in parallel mode. The law states that the maximum performance improvement a program can get from parallelism depends on how much code must be written in serial.

For example, for an interface program that deals with users, it must deal with users. The user presses a button before proceeding to the next step, which must be executed sequentially. The efficiency of the program depends on how fast you can interact with the user, and it doesn’t matter how many cores you have. Users just don’t press next. What do you do?

With the rise of cloud computing around 2000, people can easily obtain resources on the computing cloud, easily expand their services horizontally, easily mobilize resources on multiple machines and even distribute computing tasks to machines distributed worldwide. But it also brings a lot of problems and challenges. For example, how to communicate between machines and aggregate results. One of the most difficult challenges is finding a model to describe Concurrent.

We all know how difficult it is to have a piece of concurrent code without any bugs. Some concurrency bugs are discovered years after the system goes live, often for weird reasons, such as when the number of users reaches a certain threshold.

Concurrency problems generally have the following types:

Data competition. In simple terms, two or more threads read and write to a variable at the same time, causing unexpected results.

Atomicity. In a defined context, atomic operations are indivisible. The definition of context is very important. Some code that looks atomic to you in the program, such as the simplest i++, usually takes several instructions to complete at the machine level (Load, Incr, Store) and is not indivisible and therefore not atomic. Atomicity allows us to safely construct concurrent safe programs.

Memory access synchronization. The areas of code that need to be controlled and accessed by only one thread at a time are called critical areas. The Go language typically uses Mutex in the Sync package for synchronous access control. Locking generally brings large performance overhead, so it is generally necessary to consider whether the locked area will be frequently entered, and how to control the granularity of the lock.

A deadlock. In a deadlocked program, each thread is waiting for the other threads, creating a tail-to-end awkwardness that stops the program from running.

Live the lock. Imagine that you are walking down a path and a person is walking towards you. You move to the left to avoid him; He did the opposite, he went to the right, and he couldn’t pass either. Later, both of them want to go in opposite directions from the original, with the same result. This is a live lock. It looks like it’s working, but it just doesn’t move forward.

Hunger. A concurrent thread cannot obtain the resources it needs to do further work. There is usually a very greedy thread that holds resources for a long time, making it impossible for other threads to get them.

To quote a classic description of the difference between concurrency and parallelism:

Concurrency is the ability to deal with more than one thing at a time. Parallelism is the ability to do more than one thing at a time.

Explanation from Teacher Rain Mark’s “Go Language Learning Notes” :

Concurrency is the logical ability to handle multiple tasks at once; Parallelism is the physical execution of multiple tasks simultaneously.

But according to Concurrency in Go, computer concepts are the result of abstractions, and Concurrency and parallelism are no exception. It describes the difference between concurrency and parallelism as follows:

Concurrency is a property of the code; parallelism is a property of the running program.

Concurrency is a feature of code, parallelism is a feature of running programs. Ignore my poor translation for a second. It’s novel, isn’t it? It is the first time I have seen such a statement. It makes sense when I think about it.

We keep saying that we write code concurrently or in parallel, but can we provide any guarantees? If you run parallel code on a machine with only one core, can it still be parallel? Even if you are a genius, you cannot write parallel programs. At best, the code looks “concurrent,” that’s all.

Of course, it still looks parallel on the surface, but that’s just CPU smoke and mirrors. Multiple threads sharing CPU resources at different times look “parallel” over a rough interval.

So we can really only write “concurrent” code, not “parallel” code, and we just want the concurrent code to execute in parallel. Whether concurrent code can be parallel depends on the level of abstraction: concurrent primitives, runtime, operating system (virtual machine, container) in the code. The hierarchy is getting lower and higher and higher. So when we talk about concurrency or parallelism, we actually specify the context, the level of abstraction.

For example, when two people open a calculator program on a computer at the same time, the two programs should not affect each other. That’s Concurrency. In this case, the context is the machine of two people, and the two calculator processes are parallel elements.

As the level of abstraction decreases, the concurrency model actually becomes more difficult and more important, and the lower the concurrency model becomes more important to us. For concurrent programs to execute correctly, you need to delve into the concurrency model.

Before the Go language was released, the lowest abstraction we had in mind when writing concurrent code was system threads. After Go was released, a Goroutine was added to this abstract chain. And Go borrowed a concept from renowned computer scientist Tony Hoare: channel. Tony Hoare is the author of the famous article, “Communicating Sequential Processes”.

It seems like things are getting more complicated because Go introduces a lower level abstraction, but that’s not the case. Because Goroutine is not another layer of abstraction as it seems, it actually replaces system threads. When Gopher wrote code, he didn’t care about system threads. Most of the time, he just cared about Goroutines and channels. Of course, the concept of shared memory is sometimes used, usually referring to things in the sync package, such as sync.mutex.

What is the CSP

CSP is often cited as a key factor in Go’s success in concurrent programming. The full name of CSP is “Communicating Sequential Processes”, which is also a paper by Tony Hoare published in ACM in 1978. The paper points out that a programming language should pay attention to input and output primitives, especially concurrent programming code.

At the time of this article, the idea of modular programming was being studied, and the use of the GoTO statement was the most hotly debated issue at the time. At the time, object-oriented programming was on the rise, and few people cared about concurrent programming.

In this article, CSP is also a custom programming language, and the author defines input and output statements for communication between processes. Processes are considered to require inputs to drive and produce outputs for consumption by other processes, which can be processes, threads, or even blocks of code. Enter the command:! , used to write to processes; The output is:? Is read from processes. This article is about the Channel is borrowed from this design.

Hoare also proposes a -> command that if the statement to the left returns false, the statement to its right will not be executed.

With these input-output commands, Hoare demonstrated that the problem of concurrent programming can be simplified if communication between processes is prioritized in a programming language.

Go was the first language to introduce and develop these ideas of CSP. Memory Access synchronization can be useful in some cases and is supported with the sync package in Go, but it can be error-prone in large programs.

Go built CSP ideas into the core of the language from the start, so concurrent programming is a unique advantage of Go and is easy to understand.

The concurrent programming model of most programming languages is based on thread and memory synchronous access control. The concurrent programming model of Go is replaced by Goroutine and Channel. Goroutines are similar to threads, and channels are similar to Mutex (for memory synchronous access control).

Goroutine liberates programmers to think more closely about the business. Instead of worrying about the underlying issues of thread libraries, thread overhead, thread scheduling, and so on, Goroutine takes care of it for you.

Channels naturally combine with other channels. We can input the channels that collect the results of various subsystems into the same channel. Channel can also be combined with SELECT, cancel, and timeout. Mutex has none of these features.

Go’s concurrency principle is excellent, and the goal is simple: use channels as much as possible; Treat Goroutine as a free resource and feel free to use it.

To clarify, the first two paragraphs are from the Open source book Concurrency In Go, which is a highly recommended read.

With the introduction over, let’s begin today’s protagonist: Channel.

What is the channel

Goroutine and Channel are the two cornerstones of concurrent programming in Go. Goroutines are used to perform concurrent tasks, and channels are used for synchronization and communication between Goroutines.

Channel sets up a pipe between the gouroutine and transmits data in the pipe to realize communication between the gouroutine. Because it is thread-safe, it is very convenient to use; Channel also offers “first in, first out” features; It also affects blocking and awakening of goroutine.

I believe you must have seen a sentence:

Do not communicate by sharing memory; instead, share memory by communicating.

Don’t communicate by sharing memory. Communicate by sharing memory.

This is the concurrency philosophy of Go, which relies on the CSP model and is based on the Channel implementation.

Don’t they mean the same thing?

The first half of the sentence is about concurrent programming with some components in the Sync package. Go recommends using channels for concurrent programming. Both are necessary and effective. In fact, after reading the source code for a channel, you will find that the underlying channel controls concurrency through Mutex. It’s just that a channel is a higher level concurrent programming primitive that encapsulates more functionality.

There’s a decision tree and a detailed discussion about whether to choose between a low-level concurrent programming primitive In the Sync package or a Channel, which is again a good read for you In Chapter 2 of Our book, Go’s Philosophy on Concurrency. Let me post the picture:

The channel to realize CSP

Channel is a very important type in Go and is the first object in Go. Through channels, Go implements memory sharing through communication. Channels are an important means of passing data and synchronization between multiple Goroutines.

Using atomic functions and read/write locks can secure shared access to resources, but using channels is more elegant.

A channel literally means “channel”, similar to a pipe in Linux. The syntax for declaring channels is as follows:

chan T // Declare a two-way channel
chan<- T // Declare a channel that can only be used for sending
<-chan T // Declare a channel that can only be used for receiving
Copy the code

The declaration of a one-way channel, denoted by <-, indicates the direction of the channel. Once you understand that the code is written from left to right, you’ll immediately know what the direction of the channel is.

Because a channel is a reference type, its value is nil until it is initialized, which is done using the make function. We can pass it an int that represents the size of the channel buffer, and construct a buffered channel. A channel that does not pass or pass 0 constructs a non-buffered channel.

There are some differences: a non-buffered channel cannot buffer elements, and must operate in the order “send -> receive -> send -> receive ->…” If you send two elements in a row to a non-buffered CHAN and do not receive them, the second time will be blocked. For buffered channels, the operation should be “loose”. After all, it has the halo of “buffering”.

Why channel

Go implements the CSP communication model through channels and is mainly used for messaging and event notification between Goroutines.

With Channel and Goroutine, concurrent programming for Go has become incredibly easy and secure, allowing programmers to focus on the business and achieve efficiencies.

Understand that technology is not the most important thing, it is just the tool to implement the business. An efficient development language lets you save time for more meaningful things like writing articles.

Channel implementation Principle

Both the send and receive operations on chan are converted to the underlying send and receive functions at compile time.

Channels are divided into two types: buffered and unbuffered. Operations on unbuffered channels can actually be considered “synchronous mode”, while buffered channels are called “asynchronous mode”.

In synchronous mode, the sender and receiver must be ready for synchronization, and data can be transferred between them only if they are both ready (essentially a memory copy, as you will see later). Otherwise, either party that performs a send or receive operation will be suspended, waiting for the other party to appear before waking up.

In asynchronous mode, both send and receive operations can proceed smoothly as long as the buffer slot is available (with spare capacity). Otherwise, the side of the operation (such as write) is also suspended until the opposite operation (such as receive) occurs.

To summarize: In synchronous mode, the operation succeeds only when the sender and receiver are paired; otherwise, the operation is blocked. In asynchronous mode, the operation succeeds only when the buffer slot has free capacity. Otherwise, the operation is blocked.

The data structure

Source code (version 1.9.2) :

type hchan struct {
	// The number of elements in chan
	qcount   uint
	// chan The length of the underlying loop array
	dataqsiz uint
	// A pointer to the underlying loop array
	// Only for buffered channels
	buf      unsafe.Pointer
	// the size of the element in chan
	elemsize uint16
	// chan is a closed flag
	closed   uint32
	// the element type in chan
	elemtype *_type // element type
	// Index of the sent element in the loop array
	sendx    uint   // send index
	// The index of the received element in the loop array
	recvx    uint   // receive index
	// The goroutine queue waiting to receive
	recvq    waitq  // list of recv waiters
	// The goroutine queue waiting to be sent
	sendq    waitq  // list of send waiters

	// Protect all fields in hchan
	lock mutex
}
Copy the code

The meanings of the fields are written in the comments. Let’s focus on a few fields:

Buf refers to the underlying loop array, which is only available for buffered channels.

Sendx and Recvx both point to the underlying loop array and represent the current index of the element position (relative to the underlying array) that can be sent and received.

Sendq and recvq respectively represent blocked goroutines that are blocked because they are trying to read a channel or send data to a channel.

Waitq is a bidirectional linked list of sudog, which is actually an encapsulation of Goroutine:

type waitq struct {
	first *sudog
	last  *sudog
}
Copy the code

Lock is used to ensure that every operation that reads or writes a channel is atomic.

For example, create a channel data structure of type 6 with elements of type int as follows:

create

We know that a channel has two directions, sending and receiving. Theoretically, we could create a send – or receive – only channel, but how would we use it once it was created? A channel that can only send, how to receive? Similarly, how do you send data to a receipt-only channel?

In general, use make to create a channel that can send and receive:

// No buffered channel
ch1 := make(chan int)
// There is a buffer channel
ch2 := make(chan int.10)
Copy the code

From assembly analysis, we know that the function that ultimately creates chan is makechan:

func makechan(t *chantype, size int64) *hchan
Copy the code

From the point of view of the function prototype, the chan created is a pointer. So we can pass channels directly between functions without passing a pointer to a channel.

Let’s look at the code:

const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign- 1))

func makechan(t *chantype, size int64) *hchan {
	elem := t.elem

	// Omit the code to check channel size, align
	/ /...

	var c *hchan
	// If the element type has no pointer or size is 0 (no buffered type)
	// Only one memory allocation is made
	ifelem.kind&kindNoPointers ! =0 || size == 0 {
		// If there are no Pointers in the hchan structure, GC will not scan chan elements
		// Allocate only "hchan struct size + element size * number "memory
		c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil.true))
		Struct {}; struct{}; struct{};
		if size > 0&& elem.size ! =0 {
			c.buf = add(unsafe.Pointer(c), hchanSize)
		} else {
			// race detector uses this location for synchronization
			// Also prevents us from pointing beyond the allocation (see issue 9401).
			// 1. Unbuffered, buf is useless, direct to the starting address of chan
			Struct {}; struct{}; struct{}
			// Because we only use the receive and send cursor, we don't actually copy things to C.bouf (this overwrites the contents of chan)
			c.buf = unsafe.Pointer(c)
		}
	} else {
		// Perform two memory allocations
		c = new(hchan)
		c.buf = newarray(elem, int(size))
	}
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	// Loop array length
	c.dataqsiz = uint(size)

	// return the hchan pointer
	return c
}
Copy the code

When a new chan is created, it will be allocated on the heap. It will look something like this:

For clarification, this image is from a POWERPOINT presentation at Gopher Con, and the address is in Resources. This material is very clear and easy to understand and I recommend you to read it.

Next, let’s use an example from Resources to understand the process of creating, sending, and receiving a channel.

func goroutineA(a <-chan int) {
	val := <- a
	fmt.Println("G1 received data: ", val)
	return
}

func goroutineB(b <-chan int) {
	val := <- b
	fmt.Println("G2 received data: ", val)
	return
}

func main(a) {
	ch := make(chan int)
	go goroutineA(ch)
	go goroutineB(ch)
	ch <- 3
	time.Sleep(time.Second)
}
Copy the code

We first create an unbuffered channel, then launch two Goroutines and pass in the previously created channel. Then, send data 3 to this channel, and finally sleep 1 second after the program exits.

In line 14 we create a channel that is not buffered. Let’s look at the state of the chan as a whole by looking at some important fields in the chan structure.

receive

Before continuing with the examples in the previous section, let’s take a look at the source code for receiving. Once you understand the specific process of receiving, you can easily understand the specific example.

The receive operation can be written in two ways: one with “OK”, indicating whether the channel is closed; One without “OK”, in which there is no way of knowing if a zero of the corresponding type is received from the real sender or from the default type returned to the receiver when a channel is closed. Both have their own application scenarios.

After being processed by the compiler, these two methods correspond to the two functions in the source code:

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}
Copy the code

The chanrecv1 function handles the case without “OK”, and Chanrecv2 returns a “received” field to indicate whether a channel was closed. The received value is special and will be “put” to the address pointed to by the elem argument, much like in C/C++. If the received value is ignored in the code, elem here is nil.

In any case, the final shift to the Chanrecv function is:

/ / is located in the SRC/runtime/chan. Go

// The chanrecv function receives the elements of channel C and writes them to the memory address pointed to by ep.
// If ep is nil, the received value is ignored.
// If block == false, that is, if no data can be received, return (false, false)
// Otherwise, if c is off, clear the address to which ep points, returning (true, false)
// Otherwise, fill the memory address pointed to by ep with the return value. Return (true, true)
// If ep is not empty, it should point to the heap or the function caller's stack

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// Omit debug content …………

	// If it is a nil channel
	if c == nil {
		// If not blocked, return (false, false)
		if! block {return
		}
		// Otherwise, receive a nil channel and goroutine hangs
		gopark(nil.nil."chan receive (nil chan)", traceEvGoStop, 2)
		// Will not be executed up to this point
		throw("unreachable")}// In non-blocking mode, the failure is quickly detected and the lock is not acquired
	// When we observe that the channel is not ready to receive:
	There is no goroutine waiting in sendq
	// 2. Buffer type, but no elements in BUF
	Closed == 0;
	// The channel cannot be opened again, so the previous observation is not closed channel,
	// In this case, the receiver can be declared as failed and return (false, false)
	if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	/ / lock
	lock(&c.lock)

	// Channel is closed and there are no elements in the loop array buf
	// This can handle non-buffered closure and buffered closure with no buF element
	// In a buffered channel, even if the channel is closed,
	// BuF can receive elements even if there are elements in it
	ifc.closed ! =0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(unsafe.Pointer(c))
		}
		/ / unlock
		unlock(&c.lock)
		ifep ! =nil {
			// Receives from a closed channel without ignoring the return value
			// Then the received value will be a zero value of that type
			// typeDMemclr clears the memory of the corresponding address according to the type
			typedmemclr(c.elemtype, ep)
		}
		Received from a closed channel, selected returns true
		return true.false
	}

	// If there is a goroutine in the waiting queue, the BUF is full
	// This can be:
	// 1. Unbuffered channel
	// 2. Buffer channel, but buF is full
	// For 1, make a memory copy directly (from sender goroutine -> Receiver Goroutine)
	// For 2, the element from the head of the loop array is received and the element from the sender is placed at the end of the loop array
	ifsg := c.sendq.dequeue(); sg ! =nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
		return true.true
	}

	// Buffer type, buF has elements, can receive normally
	if c.qcount > 0 {
		// Find the element to receive directly from the loop array
		qp := chanbuf(c, c.recvx)

		/ /.....................

		// Instead of "< -ch ", "val < -ch ", ep refers to val
		ifep ! =nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// Clear the loop array at the corresponding position
		typedmemclr(c.elemtype, qp)
		// The receive cursor moves forward
		c.recvx++
		// The receive cursor returns to zero
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// The number of elements in the buf array is reduced by 1
		c.qcount--
		/ / unlock
		unlock(&c.lock)
		return true.true
	}

	if! block {// Non-blocking receive, unlock. Selected returns false because no value was received
		unlock(&c.lock)
		return false.false
	}

	// The next step is to block
	// Construct a sudog
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	ift0 ! =0 {
		mysg.releasetime = - 1
	}

	// The address to receive data is saved
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.param = nil
	// Enter the channel waiting receive queue
	c.recvq.enqueue(mysg)
	// Suspend the current goroutine
	goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

	// Woke up and proceeded with some cleaning up from here
	ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}
Copy the code

The above code is annotated in more detail, you can look at the source line by line, let’s look at it in detail.

  • If channel is nil, in non-blocking mode, it returns directly. In blocking mode, the gopark function is called to suspend the Goroutine, which will continue to block. Because when a channel is nil, the only way to avoid blocking is to close it, but closing a nil channel will cause panic, so there is no chance to wake up. More details can be found in the closechan function.

  • As with the send function, the next step is to quickly detect the failure and return it in non-blocking mode without acquiring the lock. By the way, when we’re writing code, if we find some boundary conditions, and we go back quickly, it makes the logic of the code clearer, because there’s less of the normal stuff going on, it’s more focused, and the person who’s looking at the code can focus more on the core logic.

	// In non-blocking mode, failure is detected quickly, no lock is acquired, fast return (false, false)
	if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}
Copy the code

When we observe that a channel is not ready to receive:

  1. Non-buffered, no Goroutine is waiting in the waiting queue
  2. Buffered, but no elements in buF

Then, it was observed that closed == 0, that is, the channel was not closed.

Since a channel cannot be opened repeatedly, it is also not closed at the time of the previous observation. Therefore, in this case, the receiving failure can be directly declared and a quick return can be made. The value (false, false) is returned because it was not selected and no data was received.

  • The next operation, first of all a lock, relatively large granularity. If channel is closed and there are no elements in the loop array buf. For unbuffered closed and buffered closed buF with no elements, return zero for the corresponding type, but the received flag is false, telling the caller that the channel was closed and that the value you retrieved was not normally sent by the sender. But in the select context, this case is selected. Many scenarios that use a channel as a notification signal hit here.

  • Next, if there is a queue waiting to be sent, the channel is full, either unbuffered or buffered, but buF is full. In both cases, data can be received normally.

Hence, the recv function is called:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {
	// If the channel is not buffered
	if c.dataqsiz == 0 {
		if raceenabled {
			racesync(c, sg)
		}
		// The received data is not ignored
		ifep ! =nil {
			// Copy data directly from sender goroutine -> Receiver Goroutine
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Buffered channel, buF is full.
		// Copy the element at the beginning of the loop array buf to the address where the data is received
		// enqueue the sender's data. In fact revx and sendx are equal at this point
		// Find the receive cursor
		qp := chanbuf(c, c.recvx)
		/ /.....................
		// Copy the data at the receiving cursor to the receiver
		ifep ! =nil {
			typedmemmove(c.elemtype, ep, qp)
		}

		// Copy the sender data to buF
		typedmemmove(c.elemtype, qp, sg.elem)
		// Update the cursor value
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx
	}
	sg.elem = nil
	gp := sg.g

	/ / unlock
	unlockf()
	gp.param = unsafe.Pointer(sg)
	ifsg.releasetime ! =0 {
		sg.releasetime = cputicks()
	}

	// Wake up the sent goroutine. You have to wait for the scheduler
	goready(gp, skip+1)}Copy the code

If it is unbuffered, it is copied directly from the sender’s stack to the receiver’s stack.

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
	// dst is on our stack or the heap, src is on another stack.
	src := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}
Copy the code

Otherwise, it is a buffered channel and buF is full. The send cursor and the receive cursor overlap, so you need to find the receive cursor first:

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
Copy the code

Copies the element to the receive address. The data to be sent by the sender is then copied to the receive cursor. This completes the operation of receiving and sending data. The sending and receiving cursors are then advanced one, respectively, starting from zero if a “wrap” occurs.

Finally, take out the Goroutine in the Sudog, call GoReady to change its state to “runnable”, and the sender wakes up waiting for the scheduler.

  • However, if there is still data in the channel’s BUF, it can be received fairly normally. Notice, here, even if the channel is closed, you can walk there. This step is relatively simple, and normally copies the data from the receive cursor in the BUF to the address where the data is received.

  • And then at the last step, we’re going to have a situation where we’re going to be blocking. Of course, if the block passes in a value of false, it does not block and simply returns.

First construct a sudog, then save the various values. Notice that the address of the received data is stored in the ELEm field, and when awakened, the received data is stored at the address pointed to by this field. Sudog is then added to the channel’s RECVQ queue. Call goparkunlock to suspend the Goroutine.

The next piece of code is to put the finishing touches to goroutine’s wake up.

Let’s continue with our previous example. In line 14, a non-buffered channel is created, followed by a Goroutine in line 15 and a Goroutine in line 16, each performing a receive operation. From the previous source code analysis, we know that both goroutines (hereinafter called G1 and G2) are blocked in the receive operation. G1 and G2 hang in the RECQ queue of a channel, forming a bidirectional circular linked list.

Before line 17 of the program, chan’s overall data structure is as follows:

Buf refers to an array of length 0, and qcount is 0, indicating that there are no elements in the channel. Focus on recvq and sendq, which are waitq constructs, and waitq is actually a bidirectional linked list. The element of the list is Sudog, which contains the G field. G represents a Goroutine, so sudog can be considered a Goroutine. Recvq stores goroutines that try to read a channel but are blocked, and Sendq stores goroutines that try to write a channel but are blocked.

At this point, we can see that recVQ has two Goroutines hanging, namely G1 and G2, which were booted earlier. G1 and G2 are blocked because there is no Goroutine to receive and channels are of unbuffered type. Sendq has no blocked goroutine.

The data structure of RECVQ is as follows. Here’s a picture directly from the article, with three-dimensional elements, nicely drawn:

Let’s look at chan’s state as a whole:

G1 and G2 are suspended, state is WAITING. The Goroutine scheduler is not the focus of today, but there will certainly be an article about it later. Briefly, goroutine is a user-mode coroutine managed by the Go Runtime, in contrast to kernel threads managed by the OS. Goroutine is much lighter, so we can easily create tens of thousands of Goroutines.

A kernel thread can manage multiple Goroutines, and when one of them blocks, the kernel thread can schedule other goroutines to run. The kernel thread itself does not block. This is what we usually call the M:N model:

M:N model usually consists of three parts: M, P and G. M is the kernel thread that runs the Goroutine; P is context, which holds the context in which the Goroutine runs, and maintains a runnable list of Goroutines; G is the goroutine to run. M and P are the basis on which G operates.

Let’s go back to the example. If G1 (go goroutineA(ch)) runs to val := < -a, it will go from running to waiting (gopark) :

G1 disconnects from M, but the scheduler can’t keep M idle, so it schedules another Goroutine to run:

The same goes for G2. Now G1 and G2 are suspended, waiting for a sender to send data to the channel.

send

Following the example above, G1 and G2 are now in the RECVQ queue.

ch <- 3
Copy the code

Line 17 sends an element 3 to the channel.

Send operation finally converted into chansend function, directly on the source, also most of the annotations, you can understand the main process:

/ / is located in the SRC/runtime/chan. Go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// If channel is nil
	if c == nil {
		// Cannot block. Return false to indicate that the message was not sent successfully
		if! block {return false
		}
		// The current goroutine is suspended
		gopark(nil.nil."chan send (nil chan)", traceEvGoStop, 2)
		throw("unreachable")}// omit debug related......

	// For non-blocking SEND, quickly detect failure scenarios
	//
	// If the channel is not closed and the channel has no extra buffer space. This could be:
	// 1. Channel is non-buffered, and there is no Goroutine in the waiting queue
	// 2. channel is buffered, but the loop array is full of elements
	if! block && c.closed ==0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// Lock the channel to secure concurrency
	lock(&c.lock)

	// If channel is closed
	ifc.closed ! =0 {
		/ / unlock
		unlock(&c.lock)
		/ / directly to panic
		panic(plainError("send on closed channel"))}// If there is a goroutine in the receiving queue, the data to be sent is copied directly to the receiving Goroutine
	ifsg := c.recvq.dequeue(); sg ! =nil {
		send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
		return true
	}

	// For buffered channels, if there is buffer space
	if c.qcount < c.dataqsiz {
		// qp points to the sendx location of buf
		qp := chanbuf(c, c.sendx)

		/ /...

		// Copy data from ep to QP
		typedmemmove(c.elemtype, qp, ep)
		// Send the cursor value plus 1
		c.sendx++
		// If the sent cursor value equals the capacity value, the cursor value returns to 0
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		// The number of elements in the buffer is increased by one
		c.qcount++

		/ / unlock
		unlock(&c.lock)
		return true
	}

	// If no blocking is required, an error is returned
	if! block { unlock(&c.lock)return false
	}

	// The channel is full and the sender is blocked. Next, a sudog is constructed

	// Get the pointer to the current goroutine
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	ift0 ! =0 {
		mysg.releasetime = - 1
	}

	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil

	// The current goroutine enters the send wait queue
	c.sendq.enqueue(mysg)

	// The current goroutine is suspended
	goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

	// The channel has a chance to send.
	ifmysg ! = gp.waiting { throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")}// The channel is closed after being awakened. Oh, panic
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)}// remove the channel bound to mysg
	mysg.c = nil
	releaseSudog(mysg)
	return true
}
Copy the code

The code above is commented in more detail, so let’s look at it in detail.

  • If a channel is detected to be empty, the current Goroutine is suspended.

  • For non-blocking send operations, if a channel is not closed and there is no extra buffer space (note: a. channel is non-buffered and there is no goroutine in the waiting queue; B. channel is buffered, but loop array is full of elements)

The Runtime source code has a lot of comments for this. The purpose of this statement is to quickly detect a send failure without blocking the send and return quickly.

if! block && c.closed ==0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
	return false
}
Copy the code

The comments are about why this block can be unlocked, so I’ll explain it in detail. The if condition reads two variables first: block and c.closed. A block is an argument to a function that doesn’t change; C. closed can be changed by other goroutines because it is not locked. These are the first two expressions of the and condition.

The last item involves three variables: C. dataqsiz, C. recvq.first, and C. qcount. C. Dataqsiz == 0&&c. recvq.first == nil refers to a non-buffered channel, and there is no goroutine waiting to receive in recvQ; C.dataqsiz > 0 &&c.qcount == C.dataqsiz refers to buffered channels, but the loop array is full. In this case, c.dataqsiz will not actually be modified, as it is determined at creation time. Not locking really affects c.qucount and c.recvq.first.

The conditions for this section are two word-sized reads, which read two Word operations: C.closed and C.rcvq.first (non-buffered) or C.qcount (buffered).

C. closed == 0 is true, that is, the channel is not closed. When c.recvq.first == nil or c.qcount == C.dataqsiz (c.dataqsiz is ignored here), it is determined that the send operation should fail and quickly return false.

There are two observation items involved: channel not closed and Channel not ready for sending. Both of these terms are inconsistent because they are not locked. For example, if the channel is not closed and then the channel is not ready for sending, the if condition will be satisfied. However, if c.closed becomes 1, the condition will not be satisfied.

However, since a closed channel cannot change its state from ‘Ready for sending’ to ‘not ready for sending’, when I observe ‘not ready for sending’, Channel is not closed. Even if C. closed == 1, that is, the channel is closed between the two observations, it also means that in the two observations, the channel satisfies two conditions: Not closed and not ready for sending.

This part of the explanation is rather confusing, but the purpose of doing this is to acquire fewer locks and improve performance.

  • If the channel is closed, panic occurs.

  • If a sudog (representing a goroutine) is queued from recVQ, the channel is empty and has no elements, so there is a waiting receiver. The send function is called to copy elements directly from the sender’s stack to the receiver’s stack, with the sendDirect function doing the critical work.

// Send handles sending to an empty channel

// Ep refers to the sent element and is copied directly to the received goroutine
// After that, the received Goroutine is woken up
// c must be empty (because there is a goroutine in the queue, it must be empty)
// C must be locked. After the sending operation is complete, the unlockf function will be used to unlock it
// The sg must have been removed from the waiting queue
// ep must be non-empty and it refers to the heap or the caller's stack

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {
	// Omit some that are not needed
	/ /...

	// sg.elem points to the location where the received value is stored, such as val < -ch, which is &val
	ifsg.elem ! =nil {
		// Copy memory directly (from sender to receiver)
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	// Goroutine binding on sudog
	gp := sg.g
	/ / unlock
	unlockf()
	gp.param = unsafe.Pointer(sg)
	ifsg.releasetime ! =0 {
		sg.releasetime = cputicks()
	}
	// Wake up the received goroutine. Skip is associated with the print stack
	goready(gp, skip+1)}Copy the code

Continue with the sendDirect function:

// Send data to a non-buffered channel, from a channel with no elements (non-buffered or buffered but empty)
// Receiving data causes one Goroutine to operate directly on another goroutine's stack
// Since GC assumes that writes to the stack can only happen while the Goroutine is running and written by the current Goroutine
// So this actually violates the assumption. This can cause some problems, so you need to use write barriers to get around it
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// SRC On the current goroutine stack, DST is another goroutine stack

	// Perform a memory "move"
	// If the destination address has a stack shrink, when we read sg.elem
	// The real DST location cannot be modified
	// Hence the need to put a barrier between reading and writing
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	memmove(dst, src, t.size)
}
Copy the code

This involves the operation of one Goroutine writing directly to another goroutine stack. In general, different goroutine stacks are unique to each other. This also violates some of GC’s assumptions. In order to avoid problems, a write barrier is added during the write process to ensure that the write operation is correctly completed. The advantage of this is that there is less memory copy: there is no need to copy the buF of the channel first, and the sender is directly sent to the receiver. There is no middleman to earn the difference, and the efficiency is improved, perfect.

Then, the receiver is unlocked, woken up, and waiting for the scheduler to arrive, the receiver is out of sight and ready to continue executing the code after the receive operation.

  • ifc.qcount < c.dataqsiz, indicating that a buffer is available (must be a buffered channel). First, use the function to fetch the position to which the element to be sent should go:
qp := chanbuf(c, c.sendx)

// returns the address of the ith element in the loop queue
func chanbuf(c *hchan, i uint) unsafe.Pointer {
	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
Copy the code

C. Sendx points to the position of the next element to be sent in the loop array, and calls the TypedmemMove function to copy it into the loop array. After that, c.sendx is incremented by 1, the total number of elements is incremented by 1: c. count++. Finally, unlock and return.

  • If no channel matches any of the preceding conditions, the channel is full. Whether the channel is buffered or unbuffered, the sender is “closed” (the Goroutine is blocked). If block is false, unlock it and return false.

  • Finally, there is the case where you really need to block. Construct a sudog and enqueue it (the Sendq field of the channel). Then call goparkunlock to suspend the current Goroutine and unlock it until the appropriate time to wake it up.

After waking up, proceed from the next line of goparkunlock.

Sudog binds the goroutine to the G field, while goroutine binds sudog to the waiting field. Sudog also binds the address of the element to be sent to the elem field. And the CHANNEL where the C field is bound to be “pit” here.

So, the address of the element to be sent is actually stored in the sudog structure, which is the current Goroutine.

Ok, look at the source code. Let’s move on to the example, which I’m sure you’ve already forgotten, so I’ll post the code:

func goroutineA(a <-chan int) {
	val := <- a
	fmt.Println("goroutine A received data: ", val)
	return
}

func goroutineB(b <-chan int) {
	val := <- b
	fmt.Println("goroutine B received data: ", val)
	return
}

func main(a) {
	ch := make(chan int)
	go goroutineA(ch)
	go goroutineB(ch)
	ch <- 3
	time.Sleep(time.Second)

	ch1 := make(chan struct{})}Copy the code

In the sending section we said that G1 and G2 are now suspended, waiting for the sender to rescue them. At line 17, the main coroutine sends an element 3 to CH, and let’s see what happens next.

> > < span style = “font-size: 10.5pt;” And add it to P’s runnable Goroutine queue.

The sender then copies the send element to the ELEm address of sudog, and finally wakes up G1 with a call to GoReady that changes the state to Runnable.

When the scheduler visits G1, put G1 in the running state and execute the following code from goroutineA. G for any other goroutine you might have.

This involves writing one coroutine to another coroutine stack. There are two receivers waiting eagerly on one side of the channel, and a sender on the other side of the channel is ready to send data to the channel. For efficiency, instead of “forwarding” once through the channel’s BUF, Copy data directly from the source address to the destination address can be, efficient ah!

In the diagram above, 3 is copied to a location on the G1 stack, where val’s address is stored in the ELEm field.

Shut down

Closing a channel executes the closechan function:

func closechan(c *hchan) {
	// Close a nil channel, panic
	if c == nil {
		panic(plainError("close of nil channel"))}/ / lock
	lock(&c.lock)
	// If the channel is closed
	ifc.closed ! =0 {
		unlock(&c.lock)
		// panic
		panic(plainError("close of closed channel"))}/ /.....................

	// Change the shutdown status
	c.closed = 1

	var glist *g

	// Release all sudogs in channel waiting to receive queue
	for {
		// Queue a sudog from the receive queue
		sg := c.recvq.dequeue()
		// Out of the loop
		if sg == nil {
			break
		}

		// If elem is not empty, the receiver does not ignore receiving data
		// Assign it a zero value of the corresponding type
		ifsg.elem ! =nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		ifsg.releasetime ! =0 {
			sg.releasetime = cputicks()
		}
		/ / remove goroutine
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, unsafe.Pointer(c))
		}
		// join to form a linked list
		gp.schedlink.set(glist)
		glist = gp
	}

	// Release sudog in channel wait send queue
	// The goroutine will panic if it exists
	for {
		// Queue a sudog from the send queue
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}

		// The sender will panic
		sg.elem = nil
		ifsg.releasetime ! =0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, unsafe.Pointer(c))
		}
		// Form a linked list
		gp.schedlink.set(glist)
		glist = gp
	}
	/ / unlock
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	// Iterate over the list
	forglist ! =nil {
		// take the last one
		gp := glist
		// Take a step forward, next awakened G
		glist = glist.schedlink.ptr()
		gp.schedlink = 0
		// Wake up the corresponding goroutine
		goready(gp, 3)}}Copy the code

The close logic is simple. For a channel, recvq and Sendq hold blocked senders and receivers, respectively. When a channel is closed, the waiting receiver receives a zero value of the corresponding type. For waiting senders, panic will occur. Therefore, do not close a channel without knowing whether it has a receiver.

The close function first locks the sender and receiver attached to the channel into a sudog linked list, and then unlocks it. Finally, wake up all the sudogs.

When you wake up, you do what you have to do. Sender will continue to execute the code after goparkunlock in chansend, and unfortunately detect that the channel is closed, Panic. Receiver is lucky, does some cleaning and returns. Here, selected returns true, and received returns a different value depending on whether the channel is closed or not. Received is false if the channel is closed, and true otherwise. So in the case we’re analyzing, received returns false.

The channel into the order

To summarize the results of operating a channel:

operation nil channel closed channel not nil, not closed channel
close panic panic Normally closed
Read < – ch blocking Read the zero value of the corresponding type Block or read data normally. A buffered channel is empty or a non-buffered channel blocks when there is no waiting sender
Write ch < – blocking panic Block or write data normally. A non-buffered channel does not wait for receivers or block when buffered channel BUF is full

To summarize, there are three ways that panic can occur: write to a closed channel; Close a nil channel; Close a channel repeatedly.

Both reading and writing a nil channel are blocked.

The nature of the send and receive elements

What is the nature of a Channel send and receive element? The answer is as follows:

Remember all transfer of value on the go channels happens with the copy of value.

That is, the sending and receiving operations of a channel are essentially “copies of values”, either from sender Goroutine’s stack to chan BUf, or from Chan BUf to receiver Goroutine. Or you can go directly from sender Goroutine to Receiver Goroutine.

Here’s another example from the article, and I’ll explain it in more detail. By the way, this is an English blog, well written, there is no such a large section of source code analysis as this article, it is broken down into the code to describe the situation, each has its own pros and cons. Recommended to read the original, reading experience is better.

type user struct {
	name string
	age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
	fmt.Println("modifyUser Received Vaule", pu)
	pu.name = "Anand"
}

func printUser(u <-chan *user) {
	time.Sleep(2 * time.Second)
	fmt.Println("printUser goRoutine called", <-u)
}

func main() {
	c := make(chan *user, 5)
	c <- g
	fmt.Println(g)
	// modify g
	g = &user{name: "Ankur Anand", age: 100}
	go printUser(c)
	go modifyUser(g)
	time.Sleep(5 * time.Second)
	fmt.Println(g)
}
Copy the code

Running results:

&{Ankur 25}
modifyUser Received Vaule &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}
Copy the code

3. Here is a good example of shared memory by communicating.

Start by constructing a structure u at address 0x56420, with its contents above the address in the figure. We then assign &u to the pointer G, whose address is 0x565bb0 and whose content is an address pointing to u.

In the main program, g is sent to C first. According to the nature of copy Value, the entry into chan Buf is 0x56420, which is the value of pointer G (not what it points to), so when printing the elements received from the channel, It is &{Ankur 25}. Thus, instead of “sending” the pointer G to a channel, we simply copy its value.

Once again:

Remember all transfer of value on the go channels happens with the copy of value.

Resource leaks

A Channel may cause a Goroutine leak.

The reason for the leak is that after goroutine operates on the channel, it is blocked in the sending or receiving state, and the channel is full or empty and cannot be changed. At the same time, the garbage collector does not collect the resource, resulting in gouroutine waiting in a queue and never seeing the light of day.

Rain mark teacher “Go language learning notes” chapter 8 channel “resource leakage” section cited an example, we can Go to see.

happened before

Wikipedia definition:

In computer science, the happened-before relation (denoted: ->) is a relation between the result of two events, such that if one event should happen before another event, the result must reflect that, even if those events are in reality executed out of order (usually to optimize program flow).

To put it simply, if there is a happened-before relationship between event A and event B, that is, a -> B, the result after a and B must reflect this relationship. Because modern compilers and cpus do all sorts of optimizations, including compiler rearrangement, memory rearrangement, and so on, the OCCUent-before limit is important in concurrent code.

According to the concurrent programming shared by Huang Yuepan on Gopher China 2019, The occuent-before relationship of a channel’s send, send finished, receive, and receive finished is as follows:

  1. The first nsendA certainhappened beforeThe first nreceive finished, whether buffered or unbuffered channels.
  2. For a buffered channel of capacity M, the NTHreceiveA certainhappened beforeN + msend finished.
  3. NTH for a channel that is not bufferedreceiveA certainhappened beforeThe first nsend finished.
  4. The channel close certainhappened beforeReceiver is notified.

Let’s explain them one by one.

First, we’re also right from a source point of view, send doesn’t have to be happened before Receive, because sometimes it’s receive, and then the Goroutine gets suspended, and then the sender wakes up, Send happened after receive. But anyway, in order to receive, there must be a send.

When the n+m send occurs in a buffered channel, there are two situations:

If the NTH receive does not occur. At this point, the channel is filled and send is blocked. So when the NTH receive occurs, the Sender Goroutine will wake up and then continue the process. Then, the NTH receive must have happened before the n+m Send Finished.

If the NTH receive has already occurred, this meets the requirement directly.

The third one is also easier to understand. If the NTH send is blocked, the Sender Goroutine hangs, and the NTH receive comes, before the NTH send Finished. If the NTH send was not blocked, then the NTH receive was waiting there, and not only did it happen before Send Finished, but it also happened before send.

Set closed = 1, wake up the waiting receiver, and copy the zero value to the receiver.

In the comments section of this blog post you can download a powerpoint presentation by Chao at Gopher 2019.

As for Happened before, here’s another example from Chai and Cao da’s new book, Advanced Programming for Go.

Section 1.5 of the book begins with the sequential consistency memory model, which is the foundation of concurrent programming.

Let’s go straight to the example:

var done = make(chan bool)
var msg string

func aGoroutine(a) {
	msg = "hello, world"
	done <- true
}

func main(a) {
	go aGoroutine()
	<-done
	println(msg)
}
Copy the code

We define a Done Channel and a string to print. In main, start a Goroutine, wait for a value from done, and print the MSG. If there is no <-done line in the main function, the printed MSG will be empty, because the aGoroutine will not be scheduled and the MSG will not be assigned, and the main program will exit. In Go, the main coroutine exits without waiting for other coroutines.

After adding the line <-done, it blocks here. After the aGoroutine sends a value to done, it will wake up and continue printing MSG. And before that, MSG has already been assigned, so it prints hello world.

The dependency on happened before is the first example. < -true > <-done received finished This means that when the <-done line in main is followed by println(MSG), MSG has already been assigned, so the desired result will be printed.

In the book, we take the third “Happened before” rule a step further and change the code:

var done = make(chan bool)
var msg string

func aGoroutine(a) {
	msg = "hello, world"
	<-done
}

func main(a) {
	go aGoroutine()
	done <- true
	println(msg)
}
Copy the code

You get the same answer. Why? According to rule 3, for a non-buffered channel, the first receive must have happened before the first Send Finished. That is, <-done will have happened before done < -true is done, which means MSG has been assigned a value and will eventually print Hello, world.

How to gracefully close a channel

This is an English article from Go 101, which can be found in the resources section on How to Gracefully close a Channel.

The article first “pokes fun” at some of the design problems of Go Channel, and then gives several examples of how to gracefully close a channel in different situations. As usual, I will give my own interpretation based on the original author’s content, but it will be interesting to go back to the original English after this section.

There are a few drawbacks to using a channel:

  1. There is no way to know if a channel is closed without changing its own state.
  2. Closing a Closed channel causes panic. Therefore, it is dangerous for the party that closes a channel to close it without knowing whether it is closed or not.
  3. Sending data to a Closed channel causes panic. Therefore, it is dangerous to send data to a channel if the sender does not know whether the channel is closed or not.

A function to check whether a channel is closed is provided:

func IsClosed(ch <-chan T) bool {
	select {
	case <-ch:
		return true
	default:}return false
}

func main(a) {
	c := make(chan T)
	fmt.Println(IsClosed(c)) // false
	close(c)
	fmt.Println(IsClosed(c)) // true
}
Copy the code

If you look at the code, there are a lot of problems. First, the IsClosed function is a function with side effects. Each call reads an element of the channel, changing the state of the channel. This is not a good function, doing the job and stealing!

Second, the IsClosed function returns only the moment it was called, and there is no guarantee that some other Goroutine has done something to it since the call to change its state. For example, if the IsClosed function returns true, and another Goroutine has closed a channel, you can send data to it with the outdated “channel not closed” message, causing panic. Of course, a channel is never closed twice. If IsClosed returns true, the channel IsClosed.

There is a popular principle for closing a channel:

don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.

Do not close a channel from one receiver, nor do you close a channel if you have multiple sender.

Well, it’s the sender that sends elements to the channel, so the Sender can decide when not to send data and close the channel. But if you have multiple sender, and one sender can’t determine the other sender, you can’t close the channel.

But the above is not the most essential, the most essential principle is only one:

don’t close (or send values to) closed channels.

There are two less elegant ways to close a channel:

  1. Using the defer-recover mechanism, feel free to close a channel or send data to it. Even if panic occurs, defer-recover is at the bottom of the pocket.

  2. Use sync.once to ensure only one shutdown.

I won’t post the code, but go straight to the original text.

How do you gracefully close a channel?

According to the number of sender and receiver, there are the following cases:

  1. A sender, a receiver
  2. One sender, M receivers
  3. N 个 sender,一个 reciver
  4. N sender, M receiver

So for 1,2, I don’t have to say that there’s only one sender, just close it from the sender, no problem. Focus on case 3 and 4.

In the third case, the only receiver says “Please stop sending more” by closing an Additional signal channel.

The solution is to add a channel to transmit the closing signal, and the receiver sends the instruction to close the data channel through the signal channel. Senders listens for the shutdown signal and stops sending data. I changed the code to be more concise:

func main(a) {
	rand.Seed(time.Now().UnixNano())

	const Max = 100000
	const NumSenders = 1000

	dataCh := make(chan int.100)
	stopCh := make(chan struct{})

	// senders
	for i := 0; i < NumSenders; i++ {
		go func(a) {
			for {
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(Max):
				}
			}
		}()
	}

	// the receiver
	go func(a) {
		for value := range dataCh {
			if value == Max- 1 {
				fmt.Println("send stop signal to senders.")
				close(stopCh)
				return
			}

			fmt.Println(value)
		}
	}()

	select {
	case <- time.After(time.Hour):
	}
}
Copy the code

The stopCh here is the signal channel, and it only has one sender, so you can just close it. After senders receives the shutdown signal, the select branch “case < -stopch” is selected and exits the function, no more data is sent.

It should be noted that the above code does not explicitly close dataCh. In the Go language, if a channel ends up without any Goroutine referencing it, it will eventually be reclaimed by gc regardless of whether the channel is closed or not. So, in this case, gracefully closing a channel means not closing the channel and letting the GC do it for you.

Finally, the graceful way to close a channel is: Any one of them says “let’s end the game” by notifying a moderator to close an additional signal channel.

Different from the third case, there are M receivers. If the third solution is adopted directly, and the receiver directly closes stopCh, a channel will be closed repeatedly, resulting in panic. Therefore, it is necessary to add a middleman, to which M receivers send “requests” to close dataCh. After receiving the first request, the middleman will directly issue instructions to close dataCh (by closing stopCh, there will be no repeated closing. Because stopCh is sent only by an intermediary). Alternatively, the N sender here could send a request to the middleman to close the dataCh.

func main(a) {
	rand.Seed(time.Now().UnixNano())

	const Max = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	dataCh := make(chan int.100)
	stopCh := make(chan struct{})

	// It must be a buffered channel.
	toStop := make(chan string.1)

	var stoppedBy string

	// moderator
	go func(a) {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(Max)
				if value == 0 {
					select {
					case toStop <- "sender#" + id:
					default:}return
				}

				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			for {
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == Max- 1 {
						select {
						case toStop <- "receiver#" + id:
						default:}return
					}

					fmt.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	select {
	case <- time.After(time.Hour):
	}

}
Copy the code

ToStop is the middleman in the code. It is used to receive the closing dataCh request from Senders and Receivers.

ToStop is declared as a buffered channel. If toStop declares a non-buffered channel, the first close dataCh request sent may be lost. Since both sender and Receiver send requests through select statements, if the goroutine in which the middleman is located is not ready, the select statement is not selected and goes to default, doing nothing. In this way, the first request to close the dataCh is lost.

If, we declare the capacity of toStop as Num(senders) + Num(Receivers), the part that sends the dataCh request can be changed to a more terse form:

. toStop :=make(chan string, NumReceivers + NumSenders)
...
			value := rand.Intn(Max)
			if value == 0 {
				toStop <- "sender#" + id
				return}...if value == Max- 1 {
					toStop <- "receiver#" + id
					return}...Copy the code

Send the request directly to toStop, because toStop is large enough that you don’t have to worry about blocking, so you don’t need to add a default case to the SELECT statement to avoid blocking.

As you can see, the dataCh is not really closed here, just as in the third case.

These are the most basic cases, but they cover almost all cases and variations. Just remember:

don’t close a channel from the receiver side and don’t close a channel if the channel has multiple concurrent senders.

And more fundamental principles:

don’t close (or send values to) closed channels.

A closed channel can still read data

Read data from a buffered channel. When a channel is closed, it still reads valid values. The data read is invalid only if ok returned is false.

func main(a) {
	ch := make(chan int.5)
	ch <- 18
	close(ch)
	x, ok := <-ch
	if ok {
		fmt.Println("received: ", x)
	}

	x, ok = <-ch
	if! ok { fmt.Println("channel closed, data invalid.")}}Copy the code

Running results:

received:  18
channel closed, data invalid.
Copy the code

You create a buffered channel, send it an element, and then close it. The next two attempts to read data from the channel still work the first time. The second return ok is false, indicating that the channel is closed and there is no data in it.

The channel application

The combination of Channel and Goroutine is the killer of Go concurrent programming. The practical application of Channel is always impressive. It can realize various functions by combining with SELECT, cancel, timer and so on. Next, let’s comb through the channel application.

Stop signal

We’ve covered a lot in the previous video on how to gracefully close a channel, so WE’ll skip this one.

A channel can be used to stop a signal in many ways, usually by closing a channel or sending an element to a channel so that the receiver knows about this information and can do something else.

Task time

Combined with timer, there are generally two ways of playing: to realize timeout control and to implement a task on a regular basis.

Sometimes, when you need to perform an operation but don’t want it to take too long, the last timer can do it:

select {
	case <-time.After(100 * time.Millisecond):
	case <-s.stopc:
		return false
}
Copy the code

After 100 ms, if S.topc has not read the data or is turned off, end immediately. This is an example from etCD source code, such writing can be seen everywhere.

It is also easier to perform a task on a regular basis:

func worker(a) {
	ticker := time.Tick(1 * time.Second)
	for {
		select {
		case <- ticker:
			// Perform scheduled tasks
			fmt.Println("Execute the scheduled task for 1s")}}}Copy the code

A scheduled task is executed every one second.

Decouple producers and consumers

When the service is started, n workers are started as a pool of work coroutines. These coroutines work in a for {} infinite loop, consuming work tasks from a channel and executing:

func main(a) {
	taskCh := make(chan int.100)
	go worker(taskCh)

    / / task
	for i := 0; i < 10; i++ {
		taskCh <- i
	}

    // Wait 1 hour
	select {
	case <-time.After(time.Hour):
	}
}

func worker(taskCh <-chan int) {
	const N = 5
	// Start 5 working coroutines
	for i := 0; i < N; i++ {
		go func(id int) {
			for {
				task := <- taskCh
				fmt.Printf("finish task: %d by worker %d\n", task, id)
				time.Sleep(time.Second)
			}
		}(i)
	}
}
Copy the code

The five work coroutines are constantly fetching tasks from the work queue, and the producer simply sends tasks to the channel, decoupling the producer and the consumer.

Program output:

finish task: 1 by worker 4
finish task: 2 by worker 2
finish task: 4 by worker 3
finish task: 3 by worker 1
finish task: 0 by worker 0
finish task: 6 by worker 0
finish task: 8 by worker 3
finish task: 9 by worker 1
finish task: 7 by worker 4
finish task: 5 by worker 2
Copy the code

Control concurrency

Sometimes hundreds of tasks need to be executed at a time, such as performing some offline computing tasks by city at a time each day. However, the number of concurrent requests should not be too high, because the task execution process depends on some resources of the third party, and the request rate is limited. You can then control the number of concurrent requests through channels.

The following example comes from Advanced Programming for the Go language:

var limit = make(chan int.3)

func main(a) {
    / /.....................
    for _, w := range work {
        go func(a) {
            limit <- 1
            w()
            <-limit
        }()
    }
    / /.....................
}
Copy the code

Construct a buffered channel with capacity 3. The list of tasks is then iterated, starting a Goroutine for each task to complete. The action of actually executing the task, accessing the third party, is done in w(). Before executing w(), obtain the “license” from the limit, and then execute w(), and return the “license” after executing the task. This allows you to control the number of Goroutines running simultaneously.

Limit < -1 is placed inside func instead of outside func.

If it is in the outer layer, it controls the number of goroutines in the system, which can block the for loop and affect the business logic.

Limit has nothing to do with logic, just performance tuning, with different semantics for the inner and outer layers.

One more thing to note is that if w() does panic, the “license” might not go back, so you’ll need to defer to ensure that.

conclusion

Finally finished, you finally finished reading, congratulations!

To recap, this article started with concurrency and parallelism, and moved on to CSP, which the Go language implements with channel. Then it explains what channel is and why it is needed, and then analyzes the realization principle of channel in detail, which is also the most important part of the paper. After that, several more advanced examples are presented, and finally, several channel application scenarios are listed.

I hope you can use this article to read the Go source code, this part of the source code is not long, like the Context package, short and concise, worth reading.

I have listed many articles and books in the resources section, many of which are worth looking at, and I mentioned them in the article.

It will be interesting to read these English articles once you understand the underlying principles of the channel. He used to have a “fear of difficulty” psychology, after understanding and then read, will feel very interesting, because you can really understand.

Finally, happy reading!

The resources

【Concurrency In Go】github.com/arpitjindal…

【Go language advanced programming open source book 】chai2010.cn/advanced-go…

Litang. me/post/golang…

[Chai da && Cao Da “Go language advanced programming”] chai2010.cn/advanced-go…

Go concurrent programming of actual combat book.douban.com/subject/262 】…

【 Cao Da Golang Notes 】github.com/cch123/gola…

Internet technology nest graphical channel animation mp.weixin.qq.com/s/40uxAPdub 】…

【 learn Golang together, the recommended materials are very useful 】 segmentfault.com/a/119000001…

How to gracefully close a channel go101.org/article/cha…

Codeburst. IO /diving-deep…

【 Kavya in Gopher Con on the design of the channel, is very good 】 speakerd.s3.amazonaws.com/presentatio…

Channel application 】 【 www.s0nnet.com/archives/go…

【 Example 】zhuyasen.com/post/go_que…

【 用】tonybai.com/2014/09/29/…

[Birdhouse concurrent programming sharing] colobu.com/2019/04/28/…

[Go-questions, code nongtaohuayuan project] github.com/qcrao/Go-Qu…

Qcrao91.gitbook. IO /go/