Looking at the excellent friends around me who started blogging early, I feel that I am lagging behind, but better late than never, and encourage myself to follow the lead. Take advantage of the last weekend of the year when the sun is shining, write your first blog to start 2019 and finish the flags you set for yourself this year.

One of the main reasons for switching from PHPer to Gopher is the continuous demand for performance and concurrency. Another reason is the native concurrency feature of Go language, which can provide the same high availability ability, use fewer machine resources and save considerable cost. Therefore, this article is combined with their own Go concurrent combat demo, write down some pit points encountered, sharing progress.

1. There are three main ways to realize concurrency control in Go language:

A) Channel – divided into unbuffered and buffered channels;

B) Synchronization between Goroutines provided by the Waitgroup-sync package;

C) Context – passing and sharing data between different goroutines in the call chain;

The first two types are mainly used in this demo. Please check the official documentation for basic usage.

2. Demo requirements and analysis:

Requirement: to realize efficient mail sending of an EDM: need to support multiple countries (can be regarded as multiple tasks), need to record the sending status of each task (the current number of success and failure), need to support stop (run), resend (run) operation.

Analysis: It can be seen from the requirements that in email sending, multiple countries (multiple tasks) can be realized through concurrency, and a single task can be concurrency in batches to achieve fast and efficient EDM requirements.

3. Demo source code:

3.1 the main. Go

package main

import (
  "bufio"
  "fmt"
  "io"
  "log"
  "os"
  "strconv"
  "sync"
  "time"
)

var (
  batchLength = 20
  wg          sync.WaitGroup
  finish      = make(chan bool))func main(a) {
  startTime := time.Now().UnixNano()

  for i := 1; i <= 3; i++ {
    filename := "./task/edm" + strconv.Itoa(i) + ".txt"
    start := 60

    go RunTask(filename, start, batchLength)
  }

  // Main blocks and waits for goroutine to complete
  fmt.Println(<-finish)

  fmt.Println("finished all tasks.")

  endTime := time.Now().UnixNano()
  fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)}/ / a single task
func RunTask(filename string, start, length int) (retErr error) {
  for {
    readLine, err := ReadLines(filename, start, length)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    iferr ! =nil {
      fmt.Println(err)
      retErr = err
      break
    }

    fmt.Println("current line:", readLine)

    start += length

    // Wait for a batch to complete before entering the next batch
    //wg.Wait()
  }

  wg.Wait()
  finish <- true

  return retErr
}
Copy the code

Note the position of Wg.wait () above (discussed below), before finish channel, to Wait for the child Goroutine to finish, notify main Goroutine via an unbuffered channel Finish, and then main to finish.

Func ReadLines() reads the specified line:

// Read the specified row
func ReadLines(filename string, start, length int) (line int, retErr error) {
  fmt.Println("current file:", filename)

  fileObj, err := os.Open(filename)
  iferr ! =nil {
    panic(err)
  }
  defer fileObj.Close()

  // Skip the line before the start line -ReadString
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    iferr ! =nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {
      wg.Add(1)
      // go executes concurrently
      go SendEmail(line)
      if startLine == endLine {
        break
      }
    }

    startLine++
  }

  return startLine, retErr
}

// Emailing is simulated
func SendEmail(email string) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}
Copy the code

Run the above main.go, and all the emails (a line in. /task/edm1.txt indicates a mailbox) will be sent by the three tasks concurrently within 1s.

true

finished all tasks.

Total cost(ms): 1001
Copy the code

BatchLength = 20 is not implemented, because if you do not batch send, as long as one of the tasks or one of the emails is wrong, the next time you run again, you will not know which users have sent, there will be repeated send. If an error occurs, the next run can start from the end line where the last error occurred. At most, one batchLength fails to be sent, which is acceptable.

Comment wg.wait () out on line 5, comment wg.wait () open on line 8:

/ / a single task
func RunTask(filename string, start, length int) (retErr error) {
  for {
    readLine, err := ReadLines(filename, start, length)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    iferr ! =nil {
      fmt.Println(err)
      retErr = err
      break
    }

    fmt.Println("current line:", readLine)

    start += length

    // Wait for a batch to complete before entering the next batch
    wg.Wait()
  }

  //wg.Wait()
  finish <- true

  return retErr
}
Copy the code

Error when running:

panic: sync: WaitGroup is reused before previous Wait has returned
Copy the code

Indicating that WaitGroup is being reused between Goroutines, although it is a global variable, which seems to be misused. How do you adjust it?

3.2 the main. Go

package main

import (
  "bufio"
  "fmt"
  "io"
  "log"
  "os"
  "strconv"
  "sync"
  "time"
)

var (
  batchLength = 10
  outerWg     sync.WaitGroup
)

func main(a) {
  startTime := time.Now().UnixNano()

  for i := 1; i <= 3; i++ {
    filename := "./task/edm" + strconv.Itoa(i) + ".txt"
    start := 60

    outerWg.Add(1)
    go RunTask(filename, start, batchLength)
  }

  // Main blocks and waits for goroutine to complete
  outerWg.Wait()

  fmt.Println("finished all tasks.")

  endTime := time.Now().UnixNano()
  fmt.Println("Total cost(ms):", (endTime-startTime)/1e6)}/ / a single task
func RunTask(filename string, start, length int) (retErr error) {
  for {
    isFinish := make(chan bool)
    readLine, err := ReadLines(filename, start, length, isFinish)
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    iferr ! =nil {
      fmt.Println(err)
      retErr = err
      break
    }

    // Wait for a batch to complete before entering the next batch
    fmt.Println("current line:", readLine)
    start += length
    <-isFinish

    // Close the channel to release resources
    close(isFinish)
  }

  outerWg.Done()

  return retErr
}
Copy the code

Alter table alter table alter table alter table alter table alter table alter table alter table

fatal error: all goroutines are asleep - deadlock!



goroutine 1 [semacquire]:

sync.runtime_Semacquire(0x55fe7c)

	/usr/local/go/src/runtime/sema.go:56 +0x39

sync.(*WaitGroup).Wait(0x55fe70)

	/usr/local/go/src/sync/waitgroup.go:131 +0x72

main.main()

	/home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1ab



goroutine 5 [chan send]:

main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0)
Copy the code

On closer inspection, isFinish is an unbuffered channel defined in the above code. Reading an unbuffered channel without data will block the current Goroutine while sending the SendMail() subcoroutine is not complete. All other Goroutines will block as well. All goroutines are asleight-deadlock!

Change the above code to buffered and try again:

isFinish := make(chan bool.1)
// Read the specified row
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
  fmt.Println("current file:", filename)

  // Control each batch before the next batch
  var wg sync.WaitGroup

  fileObj, err := os.Open(filename)
  iferr ! =nil {
    panic(err)
  }
  defer fileObj.Close()

  // Skip the line before the start line -ReadString
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    iferr ! =nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {

      wg.Add(1)
      // go executes concurrently
      go SendEmail(line, wg)
      if startLine == endLine {
        isFinish <- true
        break
      }
    }

    startLine++
  }

  wg.Wait()

  return startLine, retErr
}

// Emailing is simulated
func SendEmail(email string, wg sync.WaitGroup) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}
Copy the code

Run, error: (

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:

sync.runtime_Semacquire(0x55fe7c)

	/usr/local/go/src/runtime/sema.go:56 +0x39

sync.(*WaitGroup).Wait(0x55fe70)
Copy the code

This time, it looks like the inner WaitGroup is causing the deadlock. Further check shows that the inner WG is passing values and should use Pointers to pass references.

// go executes concurrently
go SendEmail(line, wg)
Copy the code

The last modified code is as follows:

// Read the specified row
func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {
  fmt.Println("current file:", filename)

  // Control each batch before the next batch
  var wg sync.WaitGroup

  fileObj, err := os.Open(filename)
  iferr ! =nil {
    panic(err)
  }
  defer fileObj.Close()

  // Skip the line before the start line -ReadString
  startLine := 1
  endLine := start + length
  reader := bufio.NewReader(fileObj)
  for {
    line, err := reader.ReadString(byte('\n'))
    if err == io.EOF {
      fmt.Println("Read EOF:", filename)
      retErr = err
      break
    }
    iferr ! =nil {
      log.Fatal(err)
      retErr = err
      break
    }

    if startLine > start && startLine <= endLine {

      wg.Add(1)
      // go executes concurrently
      go SendEmail(line, &wg)
      if startLine == endLine {
        isFinish <- true
        break
      }
    }

    startLine++
  }

  wg.Wait()

  return startLine, retErr
}

// Emailing is simulated
func SendEmail(email string, wg *sync.WaitGroup) error {
  defer wg.Done()

  time.Sleep(time.Second * 1)
  fmt.Println(email)

  return nil
}
Copy the code

Run it quickly, this time finally successful:)

current line: 100

current file: ./task/edm2.txt

Read EOF: ./task/edm2.txt

Read EOF: ./task/edm2.txt

finished all tasks.

Total cost(ms): 4003
Copy the code

Each task simulates 100 lines and starts running from line 60. Four tasks are executed concurrently. Each task is executed concurrently in batches, and the next batch is executed after the completion of each batch. Read the full source code or go to GitHub: github.com/astraw99/ed…

4. Summary:

In this paper, through two layers of nested Go concurrency, the simulation of high performance concurrent EDM, some specific error line control, task interruption and re-execution will be discussed next time, the main logic has run, several pit points are summarized as follows:

A) WaitGroup is generally used for the main coroutine to gracefully exit the main coroutine after all its subcoroutines exit; Note where Wg.wait () is placed when nested;

B) Properly use channel, no buffered CHAN will block the current Goroutine, with buffered CHAN will not block the current Goroutine when cap is not full, remember to release chan resources after use;

C) Pay attention to the proper use of value passing or reference passing between functions;