preface

As mentioned last time in writing a lightweight SSH batch operation tool with Go, we need to limit the concurrency of Golang concurrency and have timeout control for Goroutine execution. I didn’t go into detail at that time, but I’m going to discuss it here.

The following sample code can all run tests directly on The Go Playground:

concurrent

Let’s start with a simple concurrency

package main import ( "fmt" "time" ) func run(task_id, sleeptime int, ch chan string) { time.Sleep(time.Duration(sleeptime) * time.Second) ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime) return } func main() { input := []int{3, 2, 1} ch := make(chan string) startTime := time.Now() fmt.Println("Multirun start") for i, sleeptime := range input { go run(i, sleeptime, ch) } for range input { fmt.Println(<-ch) } endTime := time.Now() fmt.Printf("Multissh finished. Process time %s. Number  of tasks is %d", endTime.Sub(startTime), len(input)) }Copy the code

The function run() takes the input argument and sleeps for several seconds. It is then executed concurrently with the GO keyword, returning the result via channel.

A channel, as its name implies, is the “channel” for communication between Goroutines. The data flow in the pipe is actually a memory share between goroutines. It allows us to exchange data between goroutines.

Ch < -xxx // Writes data to channel < -ch // reads data from channelCopy the code

Channels can be either unbuffered or buffered. For example, we created an unbuffered channel in the following way.

ch := make(chan string)
Copy the code

Channel buffering, we’ll talk about that in a second, but let’s just look at what we’ve done.

Multirun start
task id 2 , sleep 1 second
task id 1 , sleep 2 second
task id 0 , sleep 3 second
Multissh finished. Process time 3s. Number of tasks is 3
Program exited.
Copy the code

The three goroutines’ each slept for 3,2,1 seconds. But it only took three seconds. So concurrency works, go concurrency is that simple.

According to the sequence returns

In the previous example, I performed tasks in the order 0,1,2. But the order returned from a channel is 2,1,0. Task 1 is the slowest, task 0 is the slowest, task 1 is the slowest, and task 2 is the slowest.

What if we wanted to return the data in the order in which the tasks were executed? You can do this with a channel array (ok, slice), such as this

package main

import (
    "fmt"
    "time"
)

func run(task_id, sleeptime int, ch chan string) {

    time.Sleep(time.Duration(sleeptime) * time.Second)
    ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
    return
}

func main() {
    input := []int{3, 2, 1}
    chs := make([]chan string, len(input))
    startTime := time.Now()
    fmt.Println("Multirun start")
    for i, sleeptime := range input {
        chs[i] = make(chan string)
        go run(i, sleeptime, chs[i])
    }

    for _, ch := range chs {
        fmt.Println(<-ch)
    }

    endTime := time.Now()
    fmt.Printf("Multissh finished. Process time %s. Number of tasks is %d", endTime.Sub(startTime), len(input))
}
Copy the code

Run the result and the order of the outputs is now the same as the order of the inputs.

Multirun start
task id 0 , sleep 3 second
task id 1 , sleep 2 second
task id 2 , sleep 1 second
Multissh finished. Process time 3s. Number of tasks is 3
Program exited.
Copy the code

Timeout control

We didn’t think about timeouts in this example. However, if a goroutine takes too long, it will almost certainly cause the main Goroutine to block and the whole program will hang there. So we need to have timeout control.

Usually we can do a timeout check by using select + time.after, for example, we add a function Run() and execute go Run() in Run(). Run the select + time.After command to determine the timeout.

package main

import (
    "fmt"
    "time"
)

func Run(task_id, sleeptime, timeout int, ch chan string) {
    ch_run := make(chan string)
    go run(task_id, sleeptime, ch_run)
    select {
    case re := <-ch_run:
        ch <- re
    case <-time.After(time.Duration(timeout) * time.Second):
        re := fmt.Sprintf("task id %d , timeout", task_id)
        ch <- re
    }
}

func run(task_id, sleeptime int, ch chan string) {

    time.Sleep(time.Duration(sleeptime) * time.Second)
    ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
    return
}

func main() {
    input := []int{3, 2, 1}
    timeout := 2
    chs := make([]chan string, len(input))
    startTime := time.Now()
    fmt.Println("Multirun start")
    for i, sleeptime := range input {
        chs[i] = make(chan string)
        go Run(i, sleeptime, timeout, chs[i])
    }

    for _, ch := range chs {
        fmt.Println(<-ch)
    }
    endTime := time.Now()
    fmt.Printf("Multissh finished. Process time %s. Number of task is %d", endTime.Sub(startTime), len(input))
}

Copy the code

Result: Task 0 and Task 1 have timed out

Multirun start
task id 0 , timeout
task id 1 , timeout
tasi id 2 , sleep 1 second
Multissh finished. Process time 2s. Number of task is 3
Program exited.
Copy the code

Concurrency limit

If the number of tasks is too high and goroutine is opened concurrently without limit, the resources may become too heavy and the server may explode. So concurrency limitation must be done in the real world.

One common way to do this is to use channel buffering – the one we mentioned in the beginning.

Let’s create a channel with and without buffering

Ch := make(chan string) // make(chan string, 1) // make(chan string, 1) //Copy the code

The difference between the two is that if the channel is not buffered, or the buffer is full. The Goroutine blocks automatically until the data in the channel is read away. For example

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string)
    ch <- "123"
    fmt.Println(<-ch)
}
Copy the code

This code execution will report an error

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /tmp/sandbox531498664/main.go:9 +0x60

Program exited.
Copy the code

This is because the CH we created is an unbuffered channel. So after ch<-“123”, the goroutine blocks and the following fmt.println (<-ch) cannot be executed. A DEADLOCK error will be reported.

If we change it to this, the program will run

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 1)
    ch <- "123"
    fmt.Println(<-ch)
}
Copy the code

perform

123

Program exited.
Copy the code

If we go like this

package main

import (
    "fmt"
)

func main() {
    ch := make(chan string, 1)
    ch <- "123"
    ch <- "123"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
Copy the code

Even though the channel is read twice, the program still deadlocks because the buffer is full and the Goroutine blocks and hangs. The second ch<- “123” cannot be written.

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /tmp/sandbox642690323/main.go:10 +0x80

Program exited.
Copy the code

Therefore, using channel buffering, we can implement concurrency limits. We just need to write something to a buffered channel (whatever we want to write, the content doesn’t matter) while performing concurrency. Let the concurrent Goroutine read out the contents of this channel after execution. The total number of concurrent requests is controlled by the buffer size of the channel.

For example, we can use a bool buffered channel as a counter for concurrency limits.

    chLimit := make(chan bool, 1)
Copy the code

Then, in parallel execution, each time you create a new goroutine, stuff something into the chLimit.

    for i, sleeptime := range input {
        chs[i] = make(chan string, 1)
        chLimit <- true
        go limitFunc(chLimit, chs[i], i, sleeptime, timeout)
    }
Copy the code

The newly constructed function is executed concurrently with the go keyword. After executing the original Run(), he consumes one of the chLimit buffers.

    limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {
        Run(task_id, sleeptime, timeout, ch)
        <-chLimit
    }
Copy the code

This way, when the number of goroutines created reaches the chLimit buffer limit. The main goroutine is suspended and blocked until the execution of the goroutine completes, consuming the data in the chLimit buffer, and the program continues to create new goroutines. Our goal is to limit the number of concurrent requests.

Here is the complete code

package main

import (
    "fmt"
    "time"
)

func Run(task_id, sleeptime, timeout int, ch chan string) {
    ch_run := make(chan string)
    go run(task_id, sleeptime, ch_run)
    select {
    case re := <-ch_run:
        ch <- re
    case <-time.After(time.Duration(timeout) * time.Second):
        re := fmt.Sprintf("task id %d , timeout", task_id)
        ch <- re
    }
}

func run(task_id, sleeptime int, ch chan string) {

    time.Sleep(time.Duration(sleeptime) * time.Second)
    ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
    return
}

func main() {
    input := []int{3, 2, 1}
    timeout := 2
    chLimit := make(chan bool, 1)
    chs := make([]chan string, len(input))
    limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {
        Run(task_id, sleeptime, timeout, ch)
        <-chLimit
    }
    startTime := time.Now()
    fmt.Println("Multirun start")
    for i, sleeptime := range input {
        chs[i] = make(chan string, 1)
        chLimit <- true
        go limitFunc(chLimit, chs[i], i, sleeptime, timeout)
    }

    for _, ch := range chs {
        fmt.Println(<-ch)
    }
    endTime := time.Now()
    fmt.Printf("Multissh finished. Process time %s. Number of task is %d", endTime.Sub(startTime), len(input))
}
Copy the code

The results

Multirun start
task id 0 , timeout
task id 1 , timeout
task id 2 , sleep 1 second
Multissh finished. Process time 5s. Number of task is 3
Program exited.
Copy the code

ChLimit has a buffer of 1. Task 0 and Task 1 take 2 seconds and time out. Task 2 Takes 1 second. The total time is 5 seconds. Concurrency restrictions are in effect.

If we change the concurrency limit to 2

chLimit := make(chan bool, 2)
Copy the code

The results

Multirun start
task id 0 , timeout
task id 1 , timeout
task id 2 , sleep 1 second
Multissh finished. Process time 3s. Number of task is 3
Program exited.
Copy the code

Task 0 and Task 1 are executed concurrently, which takes 2 seconds. Task 2 Takes 1 second. The total time is 3 seconds. As expected.

Notice something in the code that’s different. Here, a buffered channel is used

chs[i] = make(chan string, 1)
Copy the code

Remember the example above. If the channel is not buffered, the goroutine will block and hang until it is consumed. However, if the concurrency limit (chLimit) is in effect blocking the main Goroutine, the code that consumes this data will not execute until… Then Deadlock!

    for _, ch := range chs {
        fmt.Println(<-ch)
    }
Copy the code

So just give him a cushion.

reference

1. Golang-what-is-channel-buffer -size Golang-using -timeouts-with-channels

The above

Authorized reprinted

CC BY-SA