Sync.cond implements a condition variable for scenarios where one or a group of goroutines is awakened after the condition is met. Each Cond associated with a Locker is usually a *Mutex or RWMutex ‘depending on the need to initialize different locks.
Basic usage
Before dissecting the source code, let’s take a look at how sync.cond is used. Let’s say we implement a FIFO queue
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"time"
)
type FIFO struct {
lock sync.Mutex
cond *sync.Cond
queue []int
}
type Queue interface {
Pop() int
Offer(num int) error
}
func (f *FIFO) Offer(num int) error {
f.lock.Lock()
defer f.lock.Unlock()
f.queue = append(f.queue, num)
f.cond.Broadcast()
return nil
}
func (f *FIFO) Pop(a) int {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
f.cond.Wait()
}
item := f.queue[0]
f.queue = f.queue[1:]
return item
}
}
func main(a) {
l := sync.Mutex{}
fifo := &FIFO{
lock: l,
cond: sync.NewCond(&l),
queue: []int{},}go func(a) {
for {
fifo.Offer(rand.Int())
}
}()
time.Sleep(time.Second)
go func(a) {
for {
fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))
}
}()
go func(a) {
for {
fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))
}
}()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt)
<-ch
}
Copy the code
We have a FIFO queue with Offer and Pop, and we have a Gorountine that keeps putting data into the queue, and two gorountine that keep fetching data.
Pop
The operation determines if there is no data in the queuelen(f.queue) == 0
The callf.cond.Wait()
willgoroutine
Hung.- Wait until the
Offer
Operation put data successfully, inside callf.cond.Broadcast()
To wake up all hung up on thismutex
On thegoroutine
. Of course,sync.Cond
One is also providedSignal()
, kind of like in Javanotify()
andnotifyAll()
Wake one up and wake all up.
To summarize the general usage of sync.Mutex
- Let’s first declare one
mutex
Here,sync.Mutex
/sync.RWMutex
Can be selected according to the actual situation - call
sync.NewCond(l Locker) *Cond
Use themutex
As the referenceNotice that we’re passing Pointers here to avoidc.L.Lock()
,c.L.Unlock()
Calling replication locks frequently causes deadlocks - If the business condition is met
cond.Wait()
hanggoroutine
cond.Broadcast()
Call up all the hanginggorotune
Another waycond.Signal()
Wake up one of the first hangersgoroutine
It should be noted that the use of cond.wait() should refer to the following template for further analysis
c.L.Lock()
for! condition() { c.Wait() } ...make use of condition ...
c.L.Unlock()
Copy the code
Source code analysis
The data structure
Before analyzing the specific method, let’s take a look at the data structure of sync.cond. The specific source code is as follows:
type Cond struct {
noCopy noCopy // Cond is not allowed to be copied
// L is held while observing or changing the condition
L Locker
// Notification list goroutines that call wait() are placed in the notifyList
notify notifyList
checker copyChecker // Check whether the Cond instance is replicated
}
Copy the code
In addition to Locker, which is the mutex we just talked about, copyChecker is used to check whether Cond instances have been copied, and there is a method:
func (c *copyChecker) check(a) {
if uintptr(*c) ! =uintptr(unsafe.Pointer(c)) && ! atomic.CompareAndSwapUintptr((*uintptr)(c), 0.uintptr(unsafe.Pointer(c))) &&
uintptr(*c) ! =uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")}}Copy the code
The initial type copyChecker uintptr defaults to 0, and the first time you call check() the uintptr will copy its own address to you. = uintptr(unsafe.pointer (c)) will be called 2 times because the goroutine may have changed copyChecker during that time. If the second call is not equal, sync.cond has been copied and the memory address has been reassigned.
Sync. Cond is the notifyList of interest
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32 // Wait for the number of goroutine operations
// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32 // The number of goroutine operations to wake up
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
Copy the code
Contains three types of fields:
wait
andnotify
Two unsigned integers, respectivelyWait()
The number of operationsgoroutine
The number of awakenings,wait
It should be constant greater than or equal tonotify
lock mutex
This withsync.Mutex
When we analyze the semaphore blocking queuesemaRoot
In themutex
Same thing, noGo
For developers to usesync.Mutex
, but a simple version of the mutex implemented by the system’s internal runtime.head
andtail
And just by looking at the name, we can imagine that it’s a lot like a linked list and yes, it’s just maintaining the block in the presentsync.Cond
On thegoroutine
A linked list of components
Generally speaking, the general structure of sync.Cond is:
Operation method
Wait () operation
func (c *Cond) Wait(a) {
//1. Check whether cond is copied
c.checker.check()
//2. notifyList.wait+1
t := runtime_notifyListAdd(&c.notify)
//3. Release the lock to free resources to other goroutines
c.L.Unlock()
/ / 4. Hang goroutine
runtime_notifyListWait(&c.notify, t)
//5. Try to obtain the lock
c.L.Lock()
}
Copy the code
The Wait() method takes about five steps:
- call
copyChecker.check()
ensuresync.Cond
Will not be copied - Each call
Wait()
willsync.Cond.notifyList.wait
Property does the increment operation, which is also what it doesFIFO
The cornerstone, according towait
To determine the order of ‘goroutine1 waits
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
Copy the code
- call
c.L.Unlock()
Release the lock because the currentgoroutine
Is about to begopark
Give the lock to the othersgoroutine
Avoid deadlock - call
runtime_notifyListWait(&c.notify, t)
Maybe a little bit more complicated
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lockWithRank(&l.lock, lockRankNotifyList)
// Return immediately if you have been awakened
if less(t, l.notify) {
unlock(&l.lock)
return
}
// Enqueue itself.
s := acquireSudog()
s.g = getg()
// Assign the wait increment number to s. test to form the base for FIFO
s.ticket = t
s.releasetime = 0
t0 := int64(0)
if blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = - 1
}
// Insert the current goroutine into the notifyList
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
// Finally call gopark to suspend the current goroutine
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
ift0 ! =0 {
blockevent(s.releasetime-t0, 2)}// Goroutine is awakened to release sudog
releaseSudog(s)
}
Copy the code
The main tasks are as follows:
- Inserts the current Goroutine into the notifyList
- Call gopark to suspend the current Goroutine
- When another Goroutine is called
Signal
orBroadcast
Method, currentgoroutine
Once awakened, try again to acquire the lock
Signal operation
Signal wakes up a goroutine that has waited the longest and does not require a lock to be held.
func (c *Cond) Signal(a) {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
Copy the code
The implementation is not complicated. Check whether sync.Cond is copied and call runtime_notifyListNotifyOne
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
// wait==notify Indicates that no goroutine is waiting
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
// Lock a secondary check
t := l.notify
if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the ticket number that needs to be woken up next
atomic.Store(&l.notify, t+1)
// Try to find the g that needs to be notified.
// If it hasn't made it to the list yet we won't find it,
// but it won't park itself once it sees the new notify number.
//
// This scan looks linear but essentially always stops quickly.
// Because g's queue separately from taking numbers,
// there may be minor reorderings in the list, but we
// expect the g we're looking for to be near the front.
// The g has others in front of it on the list only to the
// extent that it lost the race, so the iteration will not
// be too long. This applies even when the g is missing:
// it hasn't yet gotten to sleep and has lost the race to
// the (few) other g's that we find on the list.
// The core of the FIFO implementation is to iterate through the list sudog.ticket to find the node specified to wake up
for p, s := (*sudog)(nil), l.head; s ! =nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
ifp ! =nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
Copy the code
Main logic:
- Check if there are goroutines waiting to be awakened that do not return directly
- increasing
notify
Property, because it’s based onnotify
andsudog.ticket
Matches to find what you need to wake upgoroutine
Because it is incrementally generated, there isFIFO
Semantics. - Iterate over the linked list held by notifyList from
head
Start basisnext
The pointer traverses in turn. It’s linear, so it’s O(n) time, but officially it’s actually fasterThis scan looks linear but essentially always stops quickly.
One small detail: Remember that in our Wait() operation, atom update of the Wait attribute and goroutine insertion of the Wait list are two separate steps, so nodes in the list can be slightly out of order if there is a race. But don’t worry about it, because ticket is increasing atomic so it doesn’t mess up the wake order.
Broadcast operations
Broadcast() differs from Singal() mainly in that it can wake up all waiting goroutines and assign the wait attribute value directly to notify.
func (c *Cond) Broadcast(a) {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// fast-path returns directly without waiting for goroutine
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
lockWithRank(&l.lock, lockRankNotifyList)
s := l.head
l.head = nil
l.tail = nil
// Update notify=wait
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Call goReady in turn to wake up goroutine
fors ! =nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}
Copy the code
The logic is simple and will not be repeated
conclusion
sync.Cond
Once created use is not allowed to be copied bynoCopy
andcopyChecker
To limit protection.Wait()
The operation starts incrementingnotifyList.wait
Property and then addgoroutine
Encapsulated intosudog
That will benotifyList.wait
Assigned tosudog.ticket
And thensudog
insertnotifyList
In the listSingal()
Actually according tonotifyList.notify
withnotifyList
Of a node in a linked listticket
Match to determine the wakeup goroutine becausenotifyList.notify
andnotifyList.wait
It’s all increasing atomic, so we haveFIFO
The semantics of theBroadcast()
Relatively simple is to wake up all waitinggoroutine