I have talked about Golang’s coroutine before, and I found that it seems to be very theoretical, especially in terms of concurrency safety. Therefore, I combined some online examples to test the clever use of channel, select and context in go routine.
Scenario – Microservice invocation
We use GIN (A Web framework) as A tool for processing requests. The requirements are as follows: A request X will call three methods A, B, and C in parallel, and add up the results returned by the three methods as Response to X’s request. However, our Response requires time (no more than 5 seconds),
Maybe one or two of A, B, or C, the processing logic is too complicated, or the amount of data is so large that the processing time is longer than expected, then we immediately cut off and return the sum of the results we have received.
Let’s define the main function first:
func main(a) {
r := gin.New()
r.GET("/calculate", calHandler)
http.ListenAndServe(": 8008", r)
}
Copy the code
Very simple, plain vanilla request acceptance and handler definition. Where calHandler is the function we use to process the request.
Define three false microservices, the third of which will be the one we timed out
func microService1(a) int {
time.Sleep(1*time.Second)
return 1
}
func microService2(a) int {
time.Sleep(2*time.Second)
return 2
}
func microService3(a) int {
time.Sleep(10*time.Second)
return 3
}
Copy the code
So let’s see, what’s in calHandler
func calHandler(c *gin.Context){... c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
return
}
Copy the code
A typical gin Response, let’s not worry about what sum is.
Point 1– Concurrent calls
Just use go, so we might have written it like this at the beginning:
go microService1()
go microService2()
go microService3()
Copy the code
It’s easy no, but wait, what do I do with the return value that we agreed on? In order to receive the results in parallel, it is easy to think of a channel. So let’s call the service like this:
var resChan = make(chan int.3) // Since there are three results, we create an int channel that can hold three values.
go func(a) {
resChan <- microService1()
}()
go func(a) {
resChan <- microService2()
}()
go func(a) {
resChan <- microService3()
}()
Copy the code
If there is something to connect to, there must be a method to calculate it, so we add a method to loop through the resChan result and calculate it:
var resContainer, sum int
for {
resContainer = <-resChan
sum += resContainer
}
Copy the code
So we have a sum to calculate each time we pull out of resChan.
Point 2– Timeout signal
We’re not done yet. What about the timeout? In order to do timeout, we need to introduce something, which is context, what is context? We use only one feature of context, timeout notification (which could have been replaced by a channel).
You can see that when we define calHandler we already passed c *gin.Context as an argument, so we don’t have to declare it ourselves. Gin.Context is simply a container of Context that runs through the gin declaration cycle, a bit like a doppelgander or quantum entanglement.
With this gin.Context, we can operate on the Context in one place, and other functions or methods that are using the Context will also feel the changes made to the Context.
ctx, _ := context.WithTimeout(c, 3*time.Second) // Define a timeout context
Copy the code
As soon as the time is up, we can get a timed out channel with ctx.done (), and other places that use the CTX will also stop and release the CTX. Generally, ctx.done () is used in conjunction with select. So we need another loop to listen on ctx.done ()
for {
select {
case <- ctx.Done():
// Return the result
}
Copy the code
Now we have two for’s, can we merge them?
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- ctx.Done():
fmt.Println("result:", sum)
return}}Copy the code
Hey, that looks good. But how do we output the results when we normally complete the microservice call? Looks like we need another flag
var count int
for {
select {
case resContainer = <-resChan:
sum += resContainer
count ++
fmt.Println("add", resContainer)
if count > 2 {
fmt.Println("result:", sum)
return
}
case <- ctx.Done():
fmt.Println("timeout result:", sum)
return}}Copy the code
We added a counter, because we only called the microservice three times, so when count is greater than 2, we should stop and print the result.
Point 3– Waiting in concurrency
The timer above is a lazy way because we know how many times microservices are called. What if we don’t know that, or need to add it later? Wouldn’t it be too inelegant to manually change the count threshold each time? At this point we can add the Sync package. One of the features of Sync that we will use is WaitGroup. It waits for a set of coroutines to complete and then performs the next step.
Let’s change the code block of the previous microservice call:
var success = make(chan int.1) // Successful channel identifier
wg := sync.WaitGroup{} // Create a waitGroup
wg.Add(3) // We add 3 tokens to the group because we are running 3 tasks
go func(a) {
resChan <- microService1()
wg.Done() // Do one, Done one} ()go func(a) {
resChan <- microService2()
wg.Done()
}()
go func(a) {
resChan <- microService3()
wg.Done()
}()
wg.Wait() // The program will block here until the first three flags are Done
success <- 1 // We send a success signal to the channel
Copy the code
Note: If we put the above code directly into calHandler, there is a problem that the WaitGroup will block our normal output anyway. So, let’s separate out and wrap the business logic code above.
// rc is the result channel, and success is the success flag channel
func MyLogic(rc chan<- int, success chan<- int) {
wg := sync.WaitGroup{} // Create a waitGroup
wg.Add(3) // We add 3 tokens to the group because we are running 3 tasks
go func(a) {
rc <- microService1()
wg.Done() // Do one, Done one} ()go func(a) {
rc <- microService2()
wg.Done()
}()
go func(a) {
rc <- microService3()
wg.Done()
}()
wg.Wait() // The program will block here until the first three flags are Done
success <- 1 // We send a success signal to the channel
}
Copy the code
Ultimately, MyLogic will run as a coroutine. (Thanks @tomorrowwu and @chenqinghe for reminding)
Now that we have success, let’s add it to the monitor for loop and make some changes to remove the original count judgment.
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- success:
fmt.Println("result:", sum)
return
case <- ctx.Done():
fmt.Println("result:", sum)
return}}Copy the code
Three cases, clear division of labor,
Case resContainer = < -reschan: Used to take the logical output and calculate
Case < -success: is the normal output under ideal conditions
Case < -ctx.done (): indicates the output in the timeout case
Let’s polish it up a bit and change the fmt.println (“result:”, sum) of the last two cases to gin’s standard HTTP Response
c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
return
Copy the code
At this point, all the major code is complete. Here’s the full version
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
)
// A request triggers the invocation of three services, each of which outputs an int,
// The request requires the result to be the sum of three service ints
// The return time of the request is not more than 3 seconds. If the request is longer than 3 seconds, only the sum of the ints obtained will be output
func calHandler(c *gin.Context) {
var resContainer, sum int
var success, resChan = make(chan int), make(chan int.3)
ctx, cancel := context.WithTimeout(c, 5*time.Second)
defer cancel()
// Real business logic
go MyLogic(resChan, success)
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- success:
c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
return
case <- ctx.Done():
c.JSON(http.StatusOK, gin.H{"code":200."result": sum})
return}}}func main(a) {
r := gin.New()
r.GET("/calculate", calHandler)
http.ListenAndServe(": 8008", r)
}
func MyLogic(rc chan<- int, success chan<- int) {
wg := sync.WaitGroup{} // Create a waitGroup
wg.Add(3) // We add 3 tokens to the group because we are running 3 tasks
go func(a) {
rc <- microService1()
wg.Done() // Do one, Done one} ()go func(a) {
rc <- microService2()
wg.Done()
}()
go func(a) {
rc <- microService3()
wg.Done()
}()
wg.Wait() // The program will block here until the first three flags are Done
success <- 1 // We send a success signal to the channel
}
func microService1(a) int {
time.Sleep(1*time.Second)
return 1
}
func microService2(a) int {
time.Sleep(2*time.Second)
return 2
}
func microService3(a) int {
time.Sleep(6*time.Second)
return 3
}
Copy the code
The above program simply describes a processing scenario that calls other microservices timeout. In the actual process, many spices need to be added to ensure the external integrity of the interface.