I have talked about Golang’s coroutine before, and I found that it seems to be very theoretical, especially in terms of concurrency safety. Therefore, I combined some online examples to test the clever use of channel, select and context in go routine.

Scenario – Microservice invocation

We use GIN (A Web framework) as A tool for processing requests. The requirements are as follows: A request X will call three methods A, B, and C in parallel, and add up the results returned by the three methods as Response to X’s request. However, our Response requires time (no more than 5 seconds),

Maybe one or two of A, B, or C, the processing logic is too complicated, or the amount of data is so large that the processing time is longer than expected, then we immediately cut off and return the sum of the results we have received.

Let’s define the main function first:

func main(a) {
	r := gin.New()
	r.GET("/calculate", calHandler)
	http.ListenAndServe(": 8008", r)
}
Copy the code

Very simple, plain vanilla request acceptance and handler definition. Where calHandler is the function we use to process the request.

Define three false microservices, the third of which will be the one we timed out

func microService1(a) int {
	time.Sleep(1*time.Second)
	return 1
}

func microService2(a) int {
	time.Sleep(2*time.Second)
	return 2
}

func microService3(a) int {
	time.Sleep(10*time.Second)
	return 3
}
Copy the code

So let’s see, what’s in calHandler

func calHandler(c *gin.Context){... c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
	return
}
Copy the code

A typical gin Response, let’s not worry about what sum is.

Point 1– Concurrent calls

Just use go, so we might have written it like this at the beginning:

go microService1()
go microService2()
go microService3()
Copy the code

It’s easy no, but wait, what do I do with the return value that we agreed on? In order to receive the results in parallel, it is easy to think of a channel. So let’s call the service like this:

var resChan = make(chan int.3) // Since there are three results, we create an int channel that can hold three values.
go func(a) {
    resChan <- microService1()
}()

go func(a) {
    resChan <- microService2()
}()

go func(a) {
    resChan <- microService3()
}()
Copy the code

If there is something to connect to, there must be a method to calculate it, so we add a method to loop through the resChan result and calculate it:

var resContainer, sum int
for {
    resContainer = <-resChan
    sum += resContainer
}
Copy the code

So we have a sum to calculate each time we pull out of resChan.

Point 2– Timeout signal

We’re not done yet. What about the timeout? In order to do timeout, we need to introduce something, which is context, what is context? We use only one feature of context, timeout notification (which could have been replaced by a channel).

You can see that when we define calHandler we already passed c *gin.Context as an argument, so we don’t have to declare it ourselves. Gin.Context is simply a container of Context that runs through the gin declaration cycle, a bit like a doppelgander or quantum entanglement.

With this gin.Context, we can operate on the Context in one place, and other functions or methods that are using the Context will also feel the changes made to the Context.

ctx, _ := context.WithTimeout(c, 3*time.Second) // Define a timeout context
Copy the code

As soon as the time is up, we can get a timed out channel with ctx.done (), and other places that use the CTX will also stop and release the CTX. Generally, ctx.done () is used in conjunction with select. So we need another loop to listen on ctx.done ()

for {
    select {
    case <- ctx.Done():
        // Return the result
}
Copy the code

Now we have two for’s, can we merge them?

for {
    select {
    case resContainer = <-resChan:
        sum += resContainer
        fmt.Println("add", resContainer)
    case <- ctx.Done():
        fmt.Println("result:", sum)
        return}}Copy the code

Hey, that looks good. But how do we output the results when we normally complete the microservice call? Looks like we need another flag

var count int
for {
    select {
    case resContainer = <-resChan:
        sum += resContainer
        count ++
        fmt.Println("add", resContainer)
        if count > 2 {
            fmt.Println("result:", sum)
            return
        }
    case <- ctx.Done():
        fmt.Println("timeout result:", sum)
        return}}Copy the code

We added a counter, because we only called the microservice three times, so when count is greater than 2, we should stop and print the result.

Point 3– Waiting in concurrency

The timer above is a lazy way because we know how many times microservices are called. What if we don’t know that, or need to add it later? Wouldn’t it be too inelegant to manually change the count threshold each time? At this point we can add the Sync package. One of the features of Sync that we will use is WaitGroup. It waits for a set of coroutines to complete and then performs the next step.

Let’s change the code block of the previous microservice call:

var success = make(chan int.1) // Successful channel identifier
wg := sync.WaitGroup{} // Create a waitGroup
wg.Add(3) // We add 3 tokens to the group because we are running 3 tasks
go func(a) {
    resChan <- microService1()
    wg.Done() // Do one, Done one} ()go func(a) {
    resChan <- microService2()
    wg.Done()
}()

go func(a) {
    resChan <- microService3()
    wg.Done()
}()
wg.Wait() // The program will block here until the first three flags are Done
success <- 1 // We send a success signal to the channel
Copy the code

Note: If we put the above code directly into calHandler, there is a problem that the WaitGroup will block our normal output anyway. So, let’s separate out and wrap the business logic code above.

// rc is the result channel, and success is the success flag channel
func MyLogic(rc chan<- int, success chan<- int) {
	wg := sync.WaitGroup{} // Create a waitGroup
	wg.Add(3) // We add 3 tokens to the group because we are running 3 tasks
	go func(a) {
		rc <- microService1()
		wg.Done() // Do one, Done one} ()go func(a) {
		rc <- microService2()
		wg.Done()
	}()

	go func(a) {
		rc <- microService3()
		wg.Done()
	}()

	wg.Wait() // The program will block here until the first three flags are Done
	success <- 1 // We send a success signal to the channel
}
Copy the code

Ultimately, MyLogic will run as a coroutine. (Thanks @tomorrowwu and @chenqinghe for reminding)

Now that we have success, let’s add it to the monitor for loop and make some changes to remove the original count judgment.

for {
	select {
	case resContainer = <-resChan:
		sum += resContainer
		fmt.Println("add", resContainer)
	case <- success:
		fmt.Println("result:", sum)
		return
	case <- ctx.Done():
		fmt.Println("result:", sum)
		return}}Copy the code

Three cases, clear division of labor,

Case resContainer = < -reschan: Used to take the logical output and calculate

Case < -success: is the normal output under ideal conditions

Case < -ctx.done (): indicates the output in the timeout case

Let’s polish it up a bit and change the fmt.println (“result:”, sum) of the last two cases to gin’s standard HTTP Response

c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
return
Copy the code

At this point, all the major code is complete. Here’s the full version

package main

import (
	"context"
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
)

// A request triggers the invocation of three services, each of which outputs an int,
// The request requires the result to be the sum of three service ints
// The return time of the request is not more than 3 seconds. If the request is longer than 3 seconds, only the sum of the ints obtained will be output
func calHandler(c *gin.Context) {
	var resContainer, sum int
	var success, resChan = make(chan int), make(chan int.3)
	ctx, cancel := context.WithTimeout(c, 5*time.Second)
	defer cancel()

	// Real business logic
	go MyLogic(resChan, success)

	for {
		select {
		case resContainer = <-resChan:
			sum += resContainer
			fmt.Println("add", resContainer)
		case <- success:
			c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
			return
		case <- ctx.Done():
			c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
			return}}}func main(a) {
	r := gin.New()
	r.GET("/calculate", calHandler)

	http.ListenAndServe(": 8008", r)
}

func MyLogic(rc chan<- int, success chan<- int) {
	wg := sync.WaitGroup{} // Create a waitGroup
	wg.Add(3) // We add 3 tokens to the group because we are running 3 tasks
	go func(a) {
		rc <- microService1()
		wg.Done() // Do one, Done one} ()go func(a) {
		rc <- microService2()
		wg.Done()
	}()

	go func(a) {
		rc <- microService3()
		wg.Done()
	}()

	wg.Wait() // The program will block here until the first three flags are Done
	success <- 1 // We send a success signal to the channel
}

func microService1(a) int {
	time.Sleep(1*time.Second)
	return 1
}

func microService2(a) int {
	time.Sleep(2*time.Second)
	return 2
}

func microService3(a) int {
	time.Sleep(6*time.Second)
	return 3
}
Copy the code

The above program simply describes a processing scenario that calls other microservices timeout. In the actual process, many spices need to be added to ensure the external integrity of the interface.