This article is based on go1.14.6

usage

  1. New goroutineBefore creatingcallWaitGroup.Add(1)And in theAt the end of executioncallWaitGroup.Done()
  2. Called in a goroutine that blocks waitingWaitGroup.Wait()When the call returns, the Goroutine it is waiting for must have finished executing

Use the sample

package main

import (
	"fmt"
	"sync"
)

func main(a) {
	wg := sync.WaitGroup{}

	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(a) {
			fmt.Println("hello world!")
			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Println("main done!")}Copy the code

The code above shows a typical sync.waitgroup usage scenario, where one Goroutine waits for the end of several other Goroutines.

For the above code, sync.waitGroup ensures that all goroutines in for execute fmt.println (” Hello world! ) then run FMT.Println(“main done! ), and exit the program.

The source code to read

The following source bits correspond to/SRC /sync/waitgroup.go

To understand the implementation of Sync.waitgroup and the pitfalls it may encounter, you must have a deep understanding of the source code. The source code of Sync.waitgroup is not much, but it takes a lot of concurrency into consideration. The overall difficulty is moderate, which is a good starting point for go beginners.

The structure of the body

type WaitGroup struct {
    // This is a common structure in the Sync library, with an empty implementation inside.
    // When the structure is initialized and used, go vet checks the code if it copies the value
	noCopy noCopy

	// There are three different values stored in the member variable:
    // 1. Add/Done counters (Add = +,Done = +) and Wait returns immediately when the counter is 0
    // 2. Number of goroutines blocking at Wait(
    // 3. Wait and wake up semaphores, described later
	state1 [3]uint32
}
Copy the code

methods

state

func (wg *WaitGroup) state(a) (statep *uint64, semap *uint32) {
	if uintptr(unsafe.Pointer(&wg.state1))%8= =0 {
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]}else {
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]}}Copy the code

This method resolves that state1 in the WaitGroup structure returns two Pointers

Statep refers to a uint64 variable in which the high 32 bits store the counter and the low 32 bits store the number of Goroutines that are blocking in the Wait.

Semap refers to semaphores used to wake up and wait, as explained later.

Add/Done

Done is an alias for waitgroup.add (-1), so I won’t explain it separately

func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	if race.Enabled {
		/ /... Novice users can skip the race.enabled code block if they encounter it. This is only used for -race compilation and has nothing to do with the main logic
	}
    
    // Add delta to the counter atom
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32) //v is the value of the counter after this increment
	w := uint32(state)      //w is the number of goroutines blocking at Wait()
	if race.Enabled && delta > 0 && v == int32(delta) {
		/ /... Novice users can skip the race.enabled code block if they encounter it. This is only used for -race compilation and has nothing to do with the main logic
	}
    
    // Under normal circumstances, each time a goroutine is created, the counter v is incremented by 1;
    // The goroutine count is Done and the counter v is reduced by 1; So the counter v must be greater than or equal to 0
	if v < 0 {
		panic("sync: negative WaitGroup counter")}// If Wait() is executed first when Add() and Wait() are called concurrently,
    // If v is 0,Wait() returns;
    // If Add() is executed first, Wait() after v>0 will block;
    // The result is inconsistent due to data contention, so Panic prompts the caller.
	ifw ! =0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// When counter v>0, no action should be performed on the goroutine waiting ()
    // If w==0, there are no goroutines waiting ()
	if v > 0 || w == 0 {
		return
	}
    
    // Note: with no errors, the code executes at this point (v==0 && w>0)
    
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
    
    // Two concurrent cases are detected here:
    // 1. Disallow new Add() calls when w changes from >0 to 0, since the new goroutine may not be waited
    // 2. When v==0, there must be no new Wait() for w++
    // In any of these cases,*statep is updated, resulting in (state! =*statep)
	if*statep ! = state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// At this point in the code, we need to wake up goroutines waiting at Wait(), depending on the number of w, so at this point, we need to wake up goroutines waiting at Wait()
    // We can set w to 0, v=0,w=0, so *statep can be set to 0
	*statep = 0
	for; w ! =0; w-- {
        //runtime_Semrelease is a built-in function of the Runtime package,
        // Wake up the goroutine blocking at runtime_Semacquire
		runtime_Semrelease(semap, false.0)}}func (wg *WaitGroup) Done(a) {
	wg.Add(- 1)}Copy the code

Wait

func (wg *WaitGroup) Wait(a) {
	statep, semap := wg.state()
	if race.Enabled {
		/ /... Novice users can skip the race.enabled code block if they encounter it. This is only used for -race compilation and has nothing to do with the main logic
	}
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32) / / counter
        w := uint32(state)      // The number of goroutines waiting ()
		if v == 0 {
			// count v==0 does not need to wait for direct return
			if race.Enabled {
				/ /... Novice users can skip the race.enabled code block if they encounter it. This is only used for -race compilation and has nothing to do with the main logic
			}
			return
		}
		// Try to add w+1 atomically, and if that fails keep trying inside the for loop, similar to optimistic locking
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 {
				/ /... Novice users can skip the race.enabled code block if they encounter it. This is only used for -race compilation and has nothing to do with the main logic
			}
            // wait for v to decrease to 0; When v is reduced to 0 by Add(), the wait here is awakened
			runtime_Semacquire(semap)
            
            V ==0&&w==0 when Wait() is awakened
            // Add() is called concurrently before Wait()
			if*statep ! =0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {
				/ /... Novice users can skip the race.enabled code block if they encounter it. This is only used for -race compilation and has nothing to do with the main logic
			}
			return}}}Copy the code

conclusion

  1. whenWaitGroupAfter being initialized and used (e.g. Add()), no value copy can be made if passing is required because of address passing
  2. Add() should not be called after Wait() to increment the counter, only Done() should be called to reduce the counter to 0