Sync.cond implements a condition variable for scenarios where one or a group of goroutines is awakened after the condition is met. Each Cond associated with a Locker is usually a *Mutex or RWMutex ‘depending on the need to initialize different locks.

Basic usage

Before dissecting the source code, let’s take a look at how sync.cond is used. Let’s say we implement a FIFO queue

package main

import (
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"time"
)

type FIFO struct {
	lock  sync.Mutex
	cond  *sync.Cond
	queue []int
}

type Queue interface {
	Pop() int
	Offer(num int) error
}

func (f *FIFO) Offer(num int) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	f.queue = append(f.queue, num)
	f.cond.Broadcast()
	return nil
}
func (f *FIFO) Pop(a) int {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			f.cond.Wait()
		}
		item := f.queue[0]
		f.queue = f.queue[1:]
		return item
	}
}

func main(a) {
	l := sync.Mutex{}
	fifo := &FIFO{
		lock:  l,
		cond:  sync.NewCond(&l),
		queue: []int{},}go func(a) {
		for {
			fifo.Offer(rand.Int())
		}
	}()
	time.Sleep(time.Second)
	go func(a) {
		for {
			fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))
		}
	}()
	go func(a) {
		for {
			fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))
		}
	}()

	ch := make(chan os.Signal, 1)
	signal.Notify(ch, os.Interrupt)
	<-ch
}

Copy the code

We have a FIFO queue with Offer and Pop, and we have a Gorountine that keeps putting data into the queue, and two gorountine that keep fetching data.

  1. PopThe operation determines if there is no data in the queue len(f.queue) == 0The callf.cond.Wait()willgoroutineHung.
  2. Wait until theOfferOperation put data successfully, inside callf.cond.Broadcast()To wake up all hung up on thismutexOn thegoroutine. Of course,sync.CondOne is also providedSignal(), kind of like in Javanotify()andnotifyAll()Wake one up and wake all up.

To summarize the general usage of sync.Mutex

  1. Let’s first declare onemutexHere,sync.Mutex/sync.RWMutexCan be selected according to the actual situation
  2. callsync.NewCond(l Locker) *Cond Use themutexAs the referenceNotice that we’re passing Pointers here to avoidc.L.Lock(),c.L.Unlock()Calling replication locks frequently causes deadlocks
  3. If the business condition is metcond.Wait()hanggoroutine
  4. cond.Broadcast()Call up all the hanginggorotuneAnother waycond.Signal()Wake up one of the first hangersgoroutine

It should be noted that the use of cond.wait() should refer to the following template for further analysis

    c.L.Lock()
    for! condition() { c.Wait() } ...make use of condition ...
   c.L.Unlock()
Copy the code

Source code analysis

The data structure

Before analyzing the specific method, let’s take a look at the data structure of sync.cond. The specific source code is as follows:

type Cond struct {
	noCopy noCopy // Cond is not allowed to be copied
	// L is held while observing or changing the condition
	L Locker
  // Notification list goroutines that call wait() are placed in the notifyList
	notify  notifyList
	checker copyChecker // Check whether the Cond instance is replicated
}

Copy the code

In addition to Locker, which is the mutex we just talked about, copyChecker is used to check whether Cond instances have been copied, and there is a method:

func (c *copyChecker) check(a) {
	if uintptr(*c) ! =uintptr(unsafe.Pointer(c)) && ! atomic.CompareAndSwapUintptr((*uintptr)(c), 0.uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) ! =uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")}}Copy the code

The initial type copyChecker uintptr defaults to 0, and the first time you call check() the uintptr will copy its own address to you. = uintptr(unsafe.pointer (c)) will be called 2 times because the goroutine may have changed copyChecker during that time. If the second call is not equal, sync.cond has been copied and the memory address has been reassigned.

Sync. Cond is the notifyList of interest

type notifyList struct {
	// wait is the ticket number of the next waiter. It is atomically
	// incremented outside the lock.
	wait uint32 // Wait for the number of goroutine operations

	// notify is the ticket number of the next waiter to be notified. It can
	// be read outside the lock, but is only written to with lock held.
	//
	// Both wait & notify can wrap around, and such cases will be correctly
	// handled as long as their "unwrapped" difference is bounded by 2^31.
	// For this not to be the case, we'd need to have 2^31+ goroutines
	// blocked on the same condvar, which is currently not possible.
	notify uint32 // The number of goroutine operations to wake up

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}
Copy the code

Contains three types of fields:

  • waitandnotifyTwo unsigned integers, respectivelyWait()The number of operationsgoroutineThe number of awakenings,waitIt should be constant greater than or equal tonotify
  • lock mutexThis withsync.MutexWhen we analyze the semaphore blocking queuesemaRootIn themutexSame thing, noGoFor developers to usesync.Mutex, but a simple version of the mutex implemented by the system’s internal runtime.
  • headandtailAnd just by looking at the name, we can imagine that it’s a lot like a linked list and yes, it’s just maintaining the block in the presentsync.CondOn thegoroutineA linked list of components

Generally speaking, the general structure of sync.Cond is:

Operation method

Wait () operation

func (c *Cond) Wait(a) {
  //1. Check whether cond is copied
	c.checker.check()
  //2. notifyList.wait+1
	t := runtime_notifyListAdd(&c.notify)
  //3. Release the lock to free resources to other goroutines
	c.L.Unlock()
  / / 4. Hang goroutine
	runtime_notifyListWait(&c.notify, t)
  //5. Try to obtain the lock
	c.L.Lock()
}
Copy the code

The Wait() method takes about five steps:

  1. callcopyChecker.check()ensuresync.CondWill not be copied
  2. Each callWait()willsync.Cond.notifyList.waitProperty does the increment operation, which is also what it doesFIFOThe cornerstone, according towaitTo determine the order of ‘goroutine1 waits
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
	// This may be called concurrently, for example, when called from
	// sync.Cond.Wait while holding a RWMutex in read mode.
	return atomic.Xadd(&l.wait, 1) - 1
}
Copy the code
  1. callc.L.Unlock()Release the lock because the currentgoroutineIs about to begoparkGive the lock to the othersgoroutineAvoid deadlock
  2. callruntime_notifyListWait(&c.notify, t)Maybe a little bit more complicated
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
	lockWithRank(&l.lock, lockRankNotifyList)

	// Return immediately if you have been awakened
	if less(t, l.notify) {
		unlock(&l.lock)
		return
	}

	// Enqueue itself.
	s := acquireSudog()
	s.g = getg()
  // Assign the wait increment number to s. test to form the base for FIFO
	s.ticket = t
	s.releasetime = 0
	t0 := int64(0)
	if blockprofilerate > 0 {
		t0 = cputicks()
		s.releasetime = - 1
	}
  // Insert the current goroutine into the notifyList
	if l.tail == nil {
		l.head = s
	} else {
		l.tail.next = s
	}
	l.tail = s
  // Finally call gopark to suspend the current goroutine
	goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
	ift0 ! =0 {
		blockevent(s.releasetime-t0, 2)}// Goroutine is awakened to release sudog
	releaseSudog(s)
}
Copy the code

The main tasks are as follows:

  • Inserts the current Goroutine into the notifyList
  • Call gopark to suspend the current Goroutine
  1. When another Goroutine is calledSignalorBroadcastMethod, currentgoroutineOnce awakened, try again to acquire the lock

Signal operation

Signal wakes up a goroutine that has waited the longest and does not require a lock to be held.

func (c *Cond) Signal(a) {
	c.checker.check()
	runtime_notifyListNotifyOne(&c.notify)
}
Copy the code

The implementation is not complicated. Check whether sync.Cond is copied and call runtime_notifyListNotifyOne

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
  // wait==notify Indicates that no goroutine is waiting
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}
	lockWithRank(&l.lock, lockRankNotifyList)
	// Lock a secondary check
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// Update the ticket number that needs to be woken up next
	atomic.Store(&l.notify, t+1)

	// Try to find the g that needs to be notified.
	// If it hasn't made it to the list yet we won't find it,
	// but it won't park itself once it sees the new notify number.
	//
	// This scan looks linear but essentially always stops quickly.
	// Because g's queue separately from taking numbers,
	// there may be minor reorderings in the list, but we
	// expect the g we're looking for to be near the front.
	// The g has others in front of it on the list only to the
	// extent that it lost the race, so the iteration will not
	// be too long. This applies even when the g is missing:
	// it hasn't yet gotten to sleep and has lost the race to
	// the (few) other g's that we find on the list.
  // The core of the FIFO implementation is to iterate through the list sudog.ticket to find the node specified to wake up
	for p, s := (*sudog)(nil), l.head; s ! =nil; p, s = s, s.next {
		if s.ticket == t {
			n := s.next
			ifp ! =nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}
Copy the code

Main logic:

  1. Check if there are goroutines waiting to be awakened that do not return directly
  2. increasingnotifyProperty, because it’s based onnotifyandsudog.ticketMatches to find what you need to wake upgoroutineBecause it is incrementally generated, there isFIFOSemantics.
  3. Iterate over the linked list held by notifyList fromheadStart basisnextThe pointer traverses in turn. It’s linear, so it’s O(n) time, but officially it’s actually fasterThis scan looks linear but essentially always stops quickly.

One small detail: Remember that in our Wait() operation, atom update of the Wait attribute and goroutine insertion of the Wait list are two separate steps, so nodes in the list can be slightly out of order if there is a race. But don’t worry about it, because ticket is increasing atomic so it doesn’t mess up the wake order.

Broadcast operations

Broadcast() differs from Singal() mainly in that it can wake up all waiting goroutines and assign the wait attribute value directly to notify.

func (c *Cond) Broadcast(a) {
	c.checker.check()
	runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// fast-path returns directly without waiting for goroutine
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)
	s := l.head
	l.head = nil
	l.tail = nil
	// Update notify=wait
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// Call goReady in turn to wake up goroutine
	fors ! =nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}
Copy the code

The logic is simple and will not be repeated

conclusion

  1. sync.CondOnce created use is not allowed to be copied bynoCopyandcopyCheckerTo limit protection.
  2. Wait()The operation starts incrementingnotifyList.waitProperty and then addgoroutineEncapsulated intosudogThat will benotifyList.waitAssigned tosudog.ticketAnd thensudoginsertnotifyListIn the list
  3. Singal()Actually according tonotifyList.notifywithnotifyListOf a node in a linked listticketMatch to determine the wakeup goroutine becausenotifyList.notifyandnotifyList.waitIt’s all increasing atomic, so we haveFIFOThe semantics of the
  4. Broadcast()Relatively simple is to wake up all waitinggoroutine