Mutex Mutex is used to allow coroutines to access a Mutex resource. This article disassemble Mutex source code into three parts, respectively from the “mutual exclusion”, “increase the opportunity to obtain locks in the operation of the coroutine”, “solve the problem of hunger” three aspects of Mutex principle.
The mutex
Mutex Mutex is used to access mutually exclusive resources by coroutines. It contains Lock() and Unlock() methods. Where, Lock() is used to compete for Lock resources. At the same time, multiple coroutines will execute Lock() to compete for Lock resources. Unlock() is used to release lock resources and wake up coroutines in the wait queue. In normal use (as shown in the code), only coroutines entering critical sections execute Unlock(). Therefore, only one coroutine can execute the Unlock() method at a time.
var lock sync.Mutex
func Hello1(a){
lock.Lock()
fmt.Println("Hello")
lock.Unlock()
}
func Hello2(a){
lock.Lock()
defer lock.Unlock()
fmt.Println("Hello")}Copy the code
Waiting queue
When the coroutine fails to compete for the Lock resource by calling the Lock() method, the coroutine enters the wait queue and switches to the blocking state. When the coroutine holding the lock resource calls Unlock() to release the lock resource, the corresponding coroutine is awakened from the wait queue. Mutex Mutex waiting queue is achieved by balanced tree, its source location in $GOROOT/SRC/runtime/sema. Go. Waiting queue uses semaphore sema to distinguish different mutex, and supports FIFO and LIFO
// enter the queue and block the current coroutine, lifo=true to the first queue, lifo=false to the last queue
func runtime_SemacquireMutex(sema *uint32, lifo bool, skipframes int)
// Wake up a coroutine from the wait queue
func runtime_Semrelease(sema *uint32, handoff bool, skipframes int)
Copy the code
The source code parsing
Mutex Mutex in solving a problem, on the basis of mutually exclusive added “increases the chances of running coroutines for lock” and “solve the problem of hunger” features. Next, this paper will decompose Mutex from three aspects of “mutual exclusion”, “increasing the chance of obtaining locks by running coroutines” and “solving hunger problem”.
Mutex structure
The Mutex Mutex has two fields: state and SEMa. The state field indicates the lock status. Sema semaphore can be understood as a unique identifier for a mutex that is used to put coroutines into and wake them up from wait queues.
type Mutex struct {
state int32 // State variable, which represents the state of the lock (number of waiting coroutines, starvation state, whether there are running coroutines, whether it is held)
sema uint32 // The semaphore used to wake up the coroutine from the wait queue
}
const (
// Lock identifier bit (last bit of state)
// mutex. state&mutexLocked==1 indicates that the state is locked; Mutex.state&mutexLocked==0 indicates that no lock is in place
mutexLocked = 1 << iota
// Is there a running coroutine (penultimate state)
// mutex. state&mutexWoken==1 indicates a running coroutine; Mutex.state&mutexWoken==0: No running coroutine exists
mutexWoken
// Hungry state bit (third from bottom of state)
State&mutexmedicine ==1; Mutex.state& mutexmedicine ==0 means no hunger
mutexStarving
// Offset of number of waiters (value 3)
// mutex. state a right-shift of three bits indicates the number of blocked coroutines in the wait queue, i.e., how many coroutines are blocked due to the lock
mutexWaiterShift = iota
)
Copy the code
The mutex
The mutual exclusion of locks is implemented by the mutexLocked flag bit. When mutexLocked is 1, the lock is held, and other coroutines need to be queued. When mutexLocked is 0, the lock is not held, and the coroutine can use CAS to set the mutexLocked flag bit to 1 to capture the lock resources.
The Lock() method flows as follows:
- Read the lock state;
- Update the lock status through CAS operation. If the CAS operation fails, return to Step 1 and go to Step 3.
- Determine whether the current coroutine successfully holds the lock resource;
- Success -> Exit
Lock()
Method to execute critical section code; - Failure -> enter the wait queue, waiting to be woken up by the coroutine holding the locked resource; Then, perform Step 1 again.
- Success -> Exit
// Lock Contention for Lock resources
func (m *Mutex) Lock(a) {
// Lucky case: the current lock is not held by any coroutines, and the lock is successfully locked
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Read the lock status
old := m.state
for {
new := old
// Set the lock's new state to held
new |= mutexLocked
// If the lock is already held, wait for coroutines +1
ifold&mutexLocked ! =0 {
new+ =1 << mutexWaiterShift
}
/ / CAS operation
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// The original state of the lock is not held, then the current coroutine successfully competed for the lock resource, and exits the for loop
if old&(mutexLocked) == 0 {
break
}
// The lock is already held by another coroutine, and the current coroutine is queued
runtime_SemacquireMutex(&m.sema, false.1)
// Re-read the status of the lock from the coroutine awakened from the wait queue
old := m.state
} else {
// CAS failed to compete for the lock again
old = m.state
}
}
}
Copy the code
The Unlock() method does the following:
- through
atomic.AddInt32(&m.state, -mutexLocked)
Atomic operations switch the lock state to an unheld state. - Determines whether the coroutine in the wait queue needs to be woken up. The coroutine is not awakened from the wait queue when the lock resource is already held or there are no longer waiting. In other cases, the coroutine is awakened from the wait queue by a CAS operation.
// Unlock releases lock resources
func (m *Mutex) Unlock(a) {
// Set the lock to an unheld state
new := atomic.AddInt32(&m.state, -mutexLocked)
// Lucky case: There is no coroutine waiting for the current lock resource
if new= =0 {
return
}
// There are coroutines waiting for the current lock resource and one of them needs to be woken up
old := new
for {
// Lock resource has no waits -> No need to release wait coroutines
// Lock resource already locked -> No longer need to release the lock resource waiting for the coroutine to grab it already held
if old>>mutexWaiterShift == 0|| old&(mutexLocked) ! =0 {
return
}
// Will wait for coroutine data -1 for the current locked resource
new = (old - 1<<mutexWaiterShift)
if atomic.CompareAndSwapInt32(&m.state, old, new) { / / CAS operation
// Wake up a coroutine waiting for the currently locked resource
runtime_Semrelease(&m.sema, false.1)
return
}
old = m.state
}
}
Copy the code
Increases the chance of a running coroutine acquiring a lock
To reduce the cost of context switching required to wake coroutines from the wait queue, Mutex Mutex is designed to increase the chance that a running coroutine will acquire the lock. This is achieved by the flag bit mutexWoken, and the flag bit mutexWoken is 1, indicating that there are currently running coroutines competing for lock resources. In this case, the Unlock() method will not wake up the coroutine from the wait queue to compete for the lock.
The Lock() method flows as follows:
- Read the lock state;
- Update by spin operation
mutexWoken
Flag bit (tellUnlock()
Method already has a running coroutine contention lock, do not wake up coroutine); - Update the lock status through CAS operation. If the CAS operation fails, return to Step 1 and go to Step 4.
- Determine whether the current coroutine successfully holds the lock resource;
- Success -> Exit
Lock()
Method to execute critical section code; - Failure -> enter the wait queue, waiting to be woken up by the coroutine holding the locked resource; Then, perform Step 1 again.
- Success -> Exit
func (m *Mutex) Lock(a) {
// Lucky case: the current lock is not held by any coroutines, and the lock is successfully locked
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// the awoke variable indicates whether the current coroutine sets the mutexWoken flag in status
awoke := false
// The number of iterations used to determine whether the spin operation can be performed
iter := 0
old := m.state
for {
// When mutex is held by another coroutine, update the mutexWoken flag bit of status with spin
// With the mutexWoken flag set, the unlock method no longer wakes up coroutines in blocks, reducing the cost of context switching
if old&mutexLocked == mutexLocked && runtime_canSpin(iter) {
// if the mutexWoken flag bit is not set and the number of wait coroutines for the lock is not 0; Use CAS operation to set mutexWoken flag bit;
if! awoke && old&mutexWoken ==0&& old>>mutexWaiterShift ! =0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
// after the successful setting of mutexWoken, the awoke awoke was set to true, indicating that mutexWoken indicated the current coroutine setting
awoke = true
}
runtime_doSpin()
// Number of iterations of spin +1
iter++
old = m.state
continue
}
new := old
// The current lock resource is already held by another coroutine, and the waiting number of the lock resource coroutine is +1
ifold&(mutexLocked) ! =0 {
new+ =1 << mutexWaiterShift
}
// If the current coroutine is set to mutexWoken, clear the mutexWoken flag bit
if awoke {
new &^= mutexWoken
}
// CAS operation to obtain the lock
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// The original state of the lock is not held, then the current coroutine successfully competed for the lock resource, and exits the for loop
if old&mutexLocked == 0 {
break
}
// The current coroutine is blocked and queued
runtime_SemacquireMutex(&m.sema, false.1)
// Re-read the status of the lock from the coroutine awakened from the wait queue
old = m.state
// The awoke coroutines set the awoke awoke awoke flag to true by setting the mutexWoken flag in unlock
awoke = true
// Clear the number of spin iterations of the awakened coroutine
iter = 0
} else {
old = m.state
}
}
}
Copy the code
The Unlock() method does the following:
- through
atomic.AddInt32(&m.state, -mutexLocked)
Atomic operations switch the lock state to an unheld state. - Determines whether the coroutine in the wait queue needs to be woken up. Coroutines are not awakened from the wait queue when “lock resource is already held” or “there are no longer waiting” or “there are already running coroutines robbing lock resource”. In other cases, the coroutine is awakened from the wait queue by a CAS operation.
func (m *Mutex) Unlock(a) {
// Set the lock to an unheld state
new := atomic.AddInt32(&m.state, -mutexLocked)
// Lucky case: There is no coroutine waiting for the current lock resource
if new= =0 {
return
}
old := new
for {
// Lock resource has no waits -> No need to release wait coroutines
// Lock resource already locked -> No longer need to release the lock resource waiting for the coroutine to grab it already held
// There are already running coroutine snatch lock resources -> No need to release waiting coroutines, reducing the cost of context switch
if old>>mutexWaiterShift == 0|| old&(mutexLocked|mutexWoken) ! =0 {
return
}
// Wake up the coroutine in the wait queue with the number of coroutines waiting -1 and mark the lock as "there are already running coroutine snatch lock resources"
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false.1)
return
}
old = m.state
}
}
Copy the code
hunger
Every time the running coroutine is given a chance to acquire locks, it will cause that the coroutine that has entered the waiting queue cannot obtain lock resources all the time and will have hunger problem. The Mutex Mutex determines whether to go hungry based on the wait time of the coroutine; After entering the hungry state, the new coroutine no longer competes for locks, but directly enters the tail of the waiting queue, increasing the opportunity for the old coroutine to obtain locks. When all the coroutines in the hungry state at the head of the waiting queue successfully acquire the lock resource, the lock will exit the hungry state.
The Lock() method flows as follows:
- Read the lock state;
- Determine if the lock is hungry
mutexStarving
:- Not hungry: Updated by spin operation
mutexWoken
Sign a - Hunger status: not updated
mutexWoken
Mark; willmutexWaiterShift
Number of waiters +1, no competition for lock resources;
- Not hungry: Updated by spin operation
- Update the lock status through CAS operation. If the CAS operation fails, return to Step 1 and go to Step 4.
- Determine whether the current coroutine successfully holds the lock resource;
- Success -> Exit
Lock()
Method to execute critical section code; - Failure -> Determine if coroutine is competing for lock resource for the first time:
- First: Records the time of the coroutine’s first contention lock
waitStartTime
; Enter the end of the wait queue; - Non-first: enters the end of the wait queue
- First: Records the time of the coroutine’s first contention lock
- When the coroutine wakes up from the wait queue, it determines whether it needs to switch to the hungry state. If the lock is already hungry, the awakened coroutine will no longer compete for the lock through the CAS operation, but will acquire the lock directly using the atomic operation (since only one awakened coroutine exists at a time, this operation is safe).
- Success -> Exit
func (m *Mutex) Lock(a) {
// Lucky case: the current lock is not held by any coroutines, and the lock is successfully locked
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// waitStartTime records the time when the coroutine first competes for the lock
var waitStartTime int64
// Starving mode flag
starving := false
awoke := false
iter := 0
old := m.state
for {
// The lock is hungry, and the wake flag of the lock is not set on the spin mutexWoken
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// ...
}
new := old
// When the lock is held or in the hungry state, the current coroutine does not acquire the lock and enters the wait state directly
if old&mutexStarving == 0 {
new |= mutexLocked
}
ifold&(mutexLocked|mutexStarving) ! =0 {
new+ =1 << mutexWaiterShift
}
// Set the hunger state
ifstarving && old&mutexLocked ! =0 {
new |= mutexStarving
}
if awoke {
new &^= mutexWoken
}
/ / CAS operation
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// mutexLocked: the original locks are not held -> Lock obtaining break
// Mutexmedicine: the original state of the lock was starving -> Failed to obtain the lock, and the coroutine entered the waiting queue
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// If waitStartTime is 0, it indicates that the coroutine is competing for lock resources for the first time
// If waitStartTime is not 0, it means that the coroutine is awakened after entering the wait queue to compete for the lock resource again. Set queueLifo to true and enter the first position in the wait queue (LIFO, last in first out).queueLifo := waitStartTime ! =0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// Enter wait queue, coroutine enters block state
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// When the coroutine is awakened, it determines whether the coroutine is hungry based on the waiting time
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// The lock is hungry, and the awakened coroutine gets the lock resource directly
ifold&mutexStarving ! =0 {
/ / mutexLocked: locked
// (-1 <
delta := int32(mutexLocked - 1<<mutexWaiterShift)
/ /! Starving: The coroutine in front of the waiting queue is hungry. When the coroutine is not hungry, the hungry flag needs to be cleared
// old>>mutexWaiterShift == 1: If the current coroutine is the last one in the queue, the hunger flag needs to be cleared; The reason is that when the coroutine is in the hungry state, the running coroutine directly enters the waiting queue, and only the awakened coroutine can obtain the lock. When the wait queue is empty, the hunger flag is not cleared in time, resulting in subsequent coroutines directly entering the wait queue. At the same time, no coroutine performs unlock operation, and all coroutines enter the blocked state.
if! starving || old>>mutexWaiterShift ==1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
}
Copy the code
The Unlock() method does the following:
- through
atomic.AddInt32(&m.state, -mutexLocked)
Atomic operations switch the lock state to an unheld state. - To determine whether or not you are hungry:
- Not hungry: Determines whether the coroutine in the wait queue needs to be awakened. Coroutines are not awakened from the wait queue when “lock resource is already held” or “there are no longer waiting” or “there are already running coroutines grabbing lock resource” or “in the hungry state”. In other cases, the coroutine is awakened from the wait queue by a CAS operation.
- Hungry: Wakes up the coroutine directly from the head of the wait queue.
func (m *Mutex) Unlock(a) {
// Set the lock to an unheld state
new := atomic.AddInt32(&m.state, -mutexLocked)
// Lucky case: There is no coroutine waiting for the current lock resource
if new= =0 {
return
}
if new&mutexStarving == 0 {
// Non-hunger mode
// The coroutine awakened by unlock needs to compete with the running coroutine, so CAS is used to determine the lock state
old := new
for {
if old>>mutexWaiterShift == 0|| old&(mutexLocked|mutexWoken|mutexStarving) ! =0 {
return
}
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false.1)
return
}
old = m.state
}
} else {
// The lock is hungry and wakes up the first coroutine in the waiting queue
// In the hungry state, the non-first contended coroutine is at the head of the queue, and the first contended coroutine is at the tail of the queue
runtime_Semrelease(&m.sema, true.1)}}Copy the code
The last
The source code for Mutex is in $GOROOT/ SRC /sync/mutex.go. In order to better explain the principle of Mutex, this article code to Mutex source code has been deleted.