directory

  • WaitGroup introduction
  • The realization of the WaitGroup
    • Add
    • Done
    • Wait

WaitGroup introduction

WaitGroup is also a common syntax used in go concurrency, so here we take a look at the use of waitGroup and its source code.

WaitGroup is also part of the Sync package and is a concurrent primitive for task choreography. It mainly solves the concurrency-wait problem: For example, there are three goroutines, namely goroutineA, goroutineB, and goroutineC, and goroutineA needs to wait until the execution of goroutineB and goroutineC is complete. Then you can execute the subsequent business logic. This can be easily resolved using WaitGroup.

In this scenario, goroutineA is the primary Goroutine, and goroutineB and goroutineC are the child Goroutines. GoroutineA needs to wait at the checkout point for both goroutineB and goroutineC to complete. If the execution of the goroutine is not complete, then goroutineA will block at the checkpoint. Execution cannot continue until all goroutines are complete.

Code implementation:

package main

import (
  "fmt"
  "sync"
)

func goroutineB(wg *sync.WaitGroup) {
  defer wg.Done()
  fmt.Println("goroutineB Execute")
  time.Sleep(time.Second)
}

func goroutineC(wg *sync.WaitGroup) {
  defer wg.Done()
  fmt.Println("goroutineC Execute")
  time.Sleep(time.Second)
}

func main(a) {
  var wg sync.WaitGroup
  wg.Add(2)
  go goroutineB(&wg)
  go goroutineC(&wg)
  wg.Wait()
  fmt.Println("goroutineB and goroutineC finished...")}Copy the code

Running results:

goroutineC Execute
goroutineB Execute
goroutineB and goroutineC finished...
Copy the code

This is a simple WaitGroup operation, and its syntax is relatively simple, providing three methods as follows:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done(a)
func (wg *WaitGroup) Wait(a)
Copy the code
  • Add: Set the WaitGroup count (number of child Goroutines)

  • Done: Used to subtract the WaitGroup count by one, starting with Add(-1)

  • Wait: The goroutine calling this method blocks until the technical value of WaitGroup is 0

Next, we’ll take a look at the source code implementation of WaitGroup and make it impossible to escape. It’s relatively light, with a few dozen lines without comments, and is a good choice for beginners.

The realization of the WaitGroup

First, let’s look at the WaitGroup data structure, which contains a secondary field for noCopy, a state1 field with compound meaning.

  • Secondary field of noCopy: Basically assist vet to check whether the WaitGroup instance is assigned by copy. I will analyze this field with you later

  • State1: compound field containing WaitGroup count, main gooutine blocking at checkpoint, and semaphore

type WaitGroup struct {
    // A technique to avoid copying is to tell THE VET tool that it violates the rules for copying
    noCopy noCopy
    // 64-bit (8bytes) values are divided into two segments. The high 32bit is counted and the low 32bit is waiter
    The other 32 bits are used as semaphores
    // Since atomic operations on 64-bit values require 64-bit alignment, which is not supported by the 32bit compiler, elements in arrays are different in different architectures
    // In general, the aligned 64bit will be found as state and the remaining 32bit will be used as semaphores
    state1 [3]uint32
}


// Get the address of state and the address of semaphore
func (wg *WaitGroup) state(a) (statep *uint64, semap *uint32) {
    if uintptr(unsafe.Pointer(&wg.state1))%8= =0 {
        // If the address is 64-bit aligned, the first two elements of the array are state and the last element is semaphore
        return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]}else {
        // If the address is 32bit aligned, the last two elements of the array are used for state, which can be used for 64-bit atomic operations. The first element, 32bit, is used for semaphore
        return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]}}Copy the code

Because atomic operations on 64-bit integers require integer addresses to be 64-bit aligned, the composition of the state field is different for 64-bit and 32-bit environments.

In a 64-bit environment, the first element of state1 is the waiter number, the second element is the WaitGroup count, and the third element is the semaphore.

In a 32-bit environment, if state1 is not a 64-bit aligned address, the first element of state1 is the semaphore, followed by waiter and count.

Next, we will look at the implementation principles of Add, Done and Wait methods.

Add

Add method implementation ideas:

The Add method counts the value in the state1 field of the main operation. When the Add method is called, the delta parameter value is first moved 32 bits to the left (the count is 32 bits higher) and then internally added to the count by atomic operations. Note that delta can be either positive or negative because the Done() method is called internally through the Add(-1) method.

The code implementation is as follows:

func (wg *WaitGroup) Add(delta int) {
  // statep stands for wait numbers and count values
  // The lower 32 bits represent the wait number, and the higher 32 bits represent the count
   statep, semap := wg.state()
   // uint64(delta)<<32 Move delta 32 bits to the left
    // Since the high 32 bits represent the count, delta is moved 32 to the left to increase the technical value
   state := atomic.AddUint64(statep, uint64(delta)<<32)
   // The current count
   v := int32(state >> 32)
   // The number of waits blocked at checkpoints
   w := uint32(state)
   if v > 0 || w == 0 {
      return
   }
   
   // If v is 0 and w is not 0, the value of state is the number of waiter
    // Set waiter to 0 because v is also 0, so the combination *statep is 0. All waiters are required and awakened at this point
   *statep = 0
   for; w ! =0; w-- {
      runtime_Semrelease(semap, false.0)}}Copy the code

Done

Inside is the Add(-1) method, which I won’t go into here.

The Done method actually decays the counter by 1
func (wg *WaitGroup) Done(a) { 
  wg.Add(- 1)}Copy the code

Wait

Wait implementation:

Constantly check the state value. If the count is zero, all of the child Goroutines have been executed and the caller does not have to wait. If the count is greater than zero, the caller becomes a waiting person, joins the wait queue, and blocks itself.

The code implementation is as follows:

func (wg *WaitGroup) Wait(a) {
   // statep stands for wait numbers and count values
   // The lower 32 bits represent the wait number, and the higher 32 bits represent the count
   statep, semap := wg.state()
   for {
      state := atomic.LoadUint64(statep)
      // Shift state 32 bits to the right to indicate the current count
      v := int32(state >> 32)
      // w stands for waiter
      w := uint32(state)
      if v == 0 {
         // If the current count value is zero, the current child goroutine has been executed
         return
      }
      // Otherwise use the atomic operation to increment the state value by one.
      if atomic.CompareAndSwapUint64(statep, state, state+1) {
         // Block sleep wait
         runtime_Semacquire(semap)
         // Wake up, stop blocking, return
         return}}}Copy the code

At this point, the basic usage and implementation principle of waitGroup has been introduced, I believe you have a different harvest, let’s see next time.

The article will be updated continuously. You can search “Maimo Coding” on wechat to read it for the first time. Every day to share quality articles, big factory experience, big factory face, help interview, is worth paying attention to every programmer platform.