Goroutines

The Goroutine is the most basic execution unit in Go. Virtually every Go program has at least one Goroutine: the main Goroutine. When the program starts, it is created automatically.

In fact, Goroutine uses a fork-join model.

sayHello := func(a) {
	fmt.Println("hello")}go sayHello()
Copy the code

So how do we join goroutine? To introduce a wait operation:

var wg sync.WaitGroup()
sayHello := func(a) {
	defer wg.Done()
	fmt.Println("hello")
}

wg.Add(1)
go sayHello()
wa.Wait()
Copy the code

Channel

Read/write channel

Goroutine is the basic scheduling unit of Go, and Channels is the communication mechanism between them. The <- operator is used to specify the direction of the pipe, send or receive. If no direction is specified, it is a bidirectional pipe.

// Create a two-way channel
ch := make(chan interface{})
Copy the code

Interface {} indicates that chan can be of any type

A channel has two main operations: send and receive. Both the send and receive operations use the <- operator. In a send statement, a channel puts the <- operator to the left. In a receive statement, a channel is placed to the right of the <- operator. A receive operation that does not use receive results is also legal.

// Send operation
ch <- x 
// Receive operation
x = <-ch 
// Ignore the received value
<-ch     
Copy the code

We can’t mistake the direction of the channel:

writeStream := make(chan<- interface{})
readStream := make(< -chan interface{})

<-writeStream
readStream <- struct{} {}Copy the code

The above statement produces the following error:

invalid operation: <-writeStream (receive from send-only type chan<- interface {}) invalid operation: readStream <- struct {} literal (send to receive-only type <-chan interface {})
Copy the code

Shut down the channel

A Channel supports the close operation, which is used to close the Channel. Any subsequent sending operation to the Channel will cause a Panic exception. Receiving a channel that has been closed can still receive the data that has been successfully sent before. A zero value is generated if there is no more data in the channel.

Read from a closed channel:

intStream := make(chan int) 
close(intStream)
integer, ok := <- intStream
fmt.Pritf("(%v): %v", ok, integer)
// (false): 0
Copy the code

In the example above, a closed channel is determined by returning the value OK. We can also handle closed channels in a more elegant way:

intStream := make(chan int) 
go func(a) {
	defer close(intStream) 
	for i:=1; i<=5; i++{ 
		intStream <- i 
	}
}()

for integer := range intStream { 
	fmt.Printf("%v ", integer)
}
// 1, 2, 3, 4, 5
Copy the code

A buffered channel

Create a buffered Channel that can hold three string elements:

ch = make(chan string.3)
Copy the code

We can send three consecutive values to the newly created channel without blocking:

ch <- "A"
ch <- "B"
ch <- "C"
Copy the code

At this point, the channel’s internal buffer queue will be full and will block if there is a fourth send operation.

If we receive a value:

fmt.Println(<-ch) // "A"
Copy the code

A channel’s buffer queue is neither full nor empty, so neither send nor receive operations on the channel will block. In this way, a channel’s buffered queue buffering decouples the receiving and sending Goroutine.

Buffered channels can be used as semaphores, for example to limit throughput. In this case, the incoming request is passed to Handle, which receives the value from the channel, processes the request and sends the value back to the channel to prepare the “semaphore” for the next request. The capacity of the channel buffer determines the upper limit on the number of simultaneous calls to Process.

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1 // Wait for the active queue to clear.
    process(r)  // It may take a long time.
    <-sem    / / is complete; Make the next request run.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // There is no need to wait for the handle to end.}}Copy the code

However, it has a design problem: Serve creates a new Goroutine for every incoming request, even though only MaxOutstanding can run at the same time. As a result, the program can consume resources indefinitely if the request comes in quickly. To compensate for this, we can modify Serve to limit the creation of Go programs, which is an obvious solution, but watch out for any bugs we fix.

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(a) {
            process(req) // There is a Bug here.
            <-sem
        }()
    }
}
Copy the code

The Bug occurs in Go’s for loop, which is reused on each iteration, so the REQ variable is shared across all goroutines, which is not what we want. We need to ensure that the REQ is unique to each Goroutine. One way to do this is to pass the value of req as an argument to the goroutine closure:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}
Copy the code

Another solution is to create a new variable with the same name, as shown in the example:

func Serve(queue chan *Request) {
    for req := range queue {
        req := req // Create a new instance of req for the Go procedure.
        sem <- 1
        go func(a) {
            process(req)
            <-sem
        }()
    }
}
Copy the code

Let’s look at another example of the Go Language Bible. It makes requests concurrently to three mirror sites, scattered in different geographical locations. They each send the received response to the buffered channel, and finally the receiver only receives the first received response, which is the fastest. So mirroredQuery might have returned results even before the other two slower-responding mirror sites responded.

func mirroredQuery(a) string {
    responses := make(chan string.3)
    go func(a) { responses <- request("asia.gopl.io") }()
    go func(a) { responses <- request("europe.gopl.io") }()
    go func(a) { responses <- request("americas.gopl.io") }()
    // Just return the fastest response
    return <-responses 
}

func request(hostname string) (response string) { / *... * / }
Copy the code

If we use unbuffered channels, the two slow goroutines will be stuck forever because no one will receive them. This situation, called a Goroutines leak, would be a BUG. Unlike garbage variables, leaking Goroutines are not automatically recycled, so it is important to ensure that every goroutine that is no longer needed exits properly.

Channels of channels

The most important feature of Go is that the channel is a first-class value, which can be assigned and passed around like other values. This feature is often used to implement secure, parallel multipath decomposition.

We can use this feature to implement a simple RPC. The following is a rough definition of the Request type.

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}
Copy the code

The client provides a function and argument, as well as a channel in the request object to receive the response.

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3.4.5}, sum, make(chan int)}
// Send the request
clientRequests <- request
// Wait for a response
fmt.Printf("answer: %d\n", <-request.resultChan)
Copy the code

Server-side handler functions:

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}
Copy the code

Channels pipeline

Channels can also be used to link multiple Goroutines together, with the output of one Channel serving as the input to the next. This series of Channels is called a pipeline. The following program uses two channels to connect three Goroutines:

The first goroutine is a counter that generates 0, 1, 2… A sequence of integers of the form, which is then channelled to the second Goroutine; The second goroutine is a program that squares each integer it receives and sends the squared result to the third Goroutine through a second channel. The third goroutine is a printer that prints every integer received.

func counter(out chan<- int) {
	for x := 0; x < 100; x++ {
		out <- x
	}
	close(out)
}

func squarer(out chan<- int, in <-chan int) {
	for v := range in {
		out <- v * v
	}
	close(out)
}

func printer(in <-chan int) {
	for v := range in {
		fmt.Println(v)
	}
}

func main(a) {
	naturals := make(chan int)
	squares := make(chan int)

	go counter(naturals)
	go squarer(squares, naturals)
	printer(squares)
}
Copy the code

Select multiplexing

Select is used to select one from a set of possible communications for further processing. If any of the communications can be further processed, one of them is randomly selected and the corresponding statement is executed. Otherwise, if there is no default case, the SELECT statement blocks until one of the branches completes.

select {
case <-ch1:
    // ...
case x := <-ch2:
    / /... use x...
case ch3 <- y:
    // ...
default:
    // ...
}
Copy the code

How to use the SELECT statement to set a time limit for an operation. The code outputs either the value of the variable news or a timeout message, depending on which of the two receiving statements executes first:

select {
case news := <-NewsAgency:
    fmt.Println(news)
case <-time.After(time.Minute):
    fmt.Println("Time out: no news in one minute.")}Copy the code

The following SELECT statement receives a value from the ABORT channel when it has a value; Do nothing when there is no value. This is a non-blocking receive operation; Doing this repeatedly is called polling a channel.

select {
case <-abort:
    fmt.Printf("Launch aborted! \n")
    return
default:
    // do nothing
}
Copy the code

Resources.

  1. Concurrency in Go
  2. gopl
  3. Effective Go