1. Core structure

// Cond implements a condition variable, A rendezvous point // for goroutines waiting for or announcing the occurrence // of an event. // A rendezvous point for goroutines waiting or announcing events  // Each Cond has an associated Locker L (often a *Mutex or *RWMutex), // which must be held when changing the condition and // when calling the Wait method. // // A Cond must not be copied after first use. type Cond struct { noCopy noCopy // L is held while observing or changing the condition L Locker notify  notifyList checker copyChecker }Copy the code

There are mainly several members in it: 1. OnCopy is used with go Vet for static compilation checks, indicating that conds cannot be copied after the first use. 2. Locker is used to ensure data correctness when Wait functions are called concurrently

// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
type notifyList struct {
	wait   uint32
	notify uint32
	lock   uintptr // key field of the mutex
	head   unsafe.Pointer
	tail   unsafe.Pointer
}
Copy the code

A notifyList corresponds to the runtime/sema.go structure

// notifyList is a ticket-based notification list used to implement sync.Cond. // // It must be kept in sync with the sync package. type notifyList struct { // wait is the ticket number of the next waiter. It is atomically // incremented Outside the lock. // Wait represents next watier ticket. Wait uint32 // Notify is the ticket number of the next waiter to be notified. It can // be read outside the  lock, but is only written to with lock held. // // Both wait & notify can wrap around, and such cases will be correctly // handled as long as their "unwrapped" difference is bounded by 2^31. // For this not to be the case, we'd need to have 2^31+ goroutines // blocked on the same condvar, Which is currently not possible. // Notify is the ticket number of the next goroutine to notify. It can be read outside the lock, but can only be written if the lock is held. Notify uint32 // List of parked lock mutex // PARKED head *sudog // GATE *sudog}Copy the code

4. The purpose of checker is also to prevent copy

2. Core functions

1, Wait
Func (c *Cond) Wait() {// check whether it is copied, Check () // notifyListAdd adds the caller to a notify list such that it can receive // notifications. The caller must eventually call notifyListWait to wait for // such a notification, Passing the returned ticket number. // Append the Goroutine to the notification list so that the Goroutine can receive the notification. // The caller must finally call notifyListWait to wait for such notification, // This may be called concurrently, for example, When called from // sync.cond. Wait while holding a RWMutex in read mode. // Due to the bottom atomic.xadd (&l.wait, 1) -1 // Waits for a notification. // Waits for a notification. // Waits for a notification. If one has been sent since // notifyListAdd was called, it returns immediately. Otherwise, it blocks. If a notification has been issued since notifyListAdd was called, notifyListWait returns immediately, otherwise block runtime_notifyListWait(&c.tify, t) c.L.lock ()}Copy the code

This function implements runtime_notifyListAdd- >notifyListAdd using the Runtime /sema.go function

func notifyListAdd(l *notifyList) uint32 {
	// This may be called concurrently, for example, when called from
	// sync.Cond.Wait while holding a RWMutex in read mode.
	return atomic.Xadd(&l.wait, 1) - 1
}
Copy the code

As you can see, the function is simply wait plus 1 in notifyList, which returns the value before the operation. Since the operation is not atomic, it is problematic in concurrent scenarios, so in Cond, Secure concurrent data by Locker runtime_notifyListWait–>notifyListWait

func notifyListWait(l *notifyList, t uint32) { lockWithRank(&l.lock, lockRankNotifyList) // Return right away if this ticket has already been notified. if less(t, l.notify) { unlock(&l.lock) return } // Enqueue itself. s := acquireSudog() s.g = getg() s.ticket = t s.releasetime = 0 t0 := int64(0) if blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if l.tail == nil { l.head = s } else { l.tail.next = s } l.tail = s goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) if t0 ! = 0 { blockevent(s.releasetime-t0, 2) } releaseSudog(s) }Copy the code

Wait = ticket of the next goroutine to be waked up; nofity = ticket of the next goroutine to be waked up. If wait<nofity has been awakened, there is no need to block. The next step is to get the current G, pass the assigned ticker to g, and append g to the tail of the linked list

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).

func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
	gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}

func goready(gp *g, traceskip int) {
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}
Copy the code

The goparkunlock () function changes g to waiting and releases the lock. The goReady function can be used to re-enter the runnable state. The runnable will be executed when notify is executed. In Wait, add and Wait are used together. If g enters for the first time, Wait and notify are both 0. After add, change Wait to 1, return the initial value of Wait 0, and then pass the 0 ticket to G

2, Signal
func (c *Cond) Signal() {
	c.checker.check()
	// notifyListNotifyOne notifies one entry in the list.
	runtime_notifyListNotifyOne(&c.notify)
}
Copy the code

runtime_notifyListNotifyOne–>notifyListNotifyOne

func notifyListNotifyOne(l *notifyList) { // Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock at all. if atomic.Load(&l.wait) == atomic.Load(&l.notify) { return } lockWithRank(&l.lock, lockRankNotifyList) // Re-check under the lock if we need to do anything. t := l.notify if t == atomic.Load(&l.wait) { unlock(&l.lock) return } atomic.Store(&l.notify, t+1) for p, s := (*sudog)(nil), l.head; s ! = nil; p, s = s, s.next { if s.ticket == t { n := s.next if p ! = nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock) }Copy the code

If wait and notify are equal, there is no new G waiting. If wait and notify are equal, there is no new G waiting. After obtaining the lock, perform double-check wait and notify to prevent other g from notifying in concurrent scenarios. Add notify+1, then iterate through the wait list to find g for the ticket that needs notification, and execute readyWithTime

func readyWithTime(s *sudog, traceskip int) { if s.releasetime ! = 0 {s.eleasetime = cpuTicks ()} goReady (s.g, traceskip)}Copy the code

After execution

Third, summary

Wait =nofity+1; wait=nofity+1; Then execute Signal to make nofity+1 like waig=notify. This means that a Wait must occur before Signal, or else a deadlock will occur

2. Cond must be used in conjunction with the configuration Lock, which needs to be passed during initialization

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}
Copy the code

3. Wait is used with Lock

c.L.Lock() for ! condition() { c.Wait() } ... make use of condition ... c.L.Unlock()Copy the code