In the Go language standard library, the Sync/Atomic package encapsulates atomic memory operations provided by the underlying hardware as Go functions.
Mutex is implemented by the operating system, while atomic operations in the atomic package are directly supported by the underlying hardware. In the instruction set implemented by CPU, some instructions are directly encapsulated into atomic package, and these instructions are not allowed to be interrupted during execution. Therefore, atomic operation can ensure concurrency safety in lock-free condition, and its performance can be linearly expanded with the increase of the number of cpus.
The data type
- int32
- int64
- uint32
- uint64
- uintptr
- unsafe.Pointer
Operation type
-
Plus or minus AddXXX
*addr += delta return *addr Copy the code
-
Compare and exchange CompareAndSwapXXX
if *addr == old { *addr = new return true } return false Copy the code
-
Load the LoadXXX
-
return *addr Copy the code
-
Storage StoreXXX
*addr = val Copy the code
-
Exchange SwapXXX
old = *addr *addr = new return old Copy the code
What operations are called atomic operations?
The ability of one or more operations to remain uninterrupted during CPU execution is called atomicity. These operations appear to the outside world as an integral whole, and they are either all performed or none performed. The outside world does not see that they are only half-performed. In the real world, cpus cannot perform a series of operations without interruption, but if we can make their intermediate states invisible while performing multiple operations, we can claim that they have “indivisible” atomicity.
In Go, a normal assignment statement is not really an atomic operation. For example, writing a variable of type INT64 on a 32-bit machine will have an intermediate state because it will be split into two writes (MOV) — write the lower 32 bits and write the higher 32 bits.
How about a lock?
Atomic operations are supported by the underlying hardware, while locking is implemented by the operating system’s scheduler. Locks should be used to protect a piece of logic. Atomic operations are usually more efficient for protecting a variable update and take advantage of multiple cores in a computer. If you want to update a composite object, use an implementation wrapped in atomic.value.
Value type operation
- If one thread writes the lower 32 bits and another thread reads the variable before writing the higher 32 bits, the result is a nonlogical intermediate variable, which can cause weird bugs in the program.
// The CAS operation is not so easy to succeed when the value being operated on is frequently changed
// Use the for loop for multiple attempts
var value int32
func addValue1(delta int32){
for{
// The value can be read and written simultaneously
// Then the read operation will probably read a half-modified data
v := value
if atomic.CompareAndSwapInt32(&value, v, v + delta){
break}}}Copy the code
- Use the Load function to prevent half-valid data from being read
func addValue2(delta int32){
for{
// Use load
v := atomic.LoadInt32(&value)
if atomic.CompareAndSwapInt32(&value, v, v + delta){
// Exit the loop when the result of the function is true
break}}}Copy the code
Struct type operation
- The probability of problems is higher if you assign to a structure directly. The reader reads the variable as soon as the thread has written a part of the fields, so it can only read part of the modified values, which destroys the integrity of the variable and the values it reads are completely wrong.
- Faced with the problem of reading and writing variables in multiple threads, version 1.4
atomic.Value
Come on, it allows us to not rely on incompatibleunsafe.Pointer
Type, while encapsulating reads and writes of arbitrary data types as atomic operations (making intermediate states invisible).
// An example
type Config struct {
Addr string
Port string
}
func (c Config) String(a) string {
return c.Addr + ":" + c.Port
}
func loadConfig(a) Config {
// do something
return Config{}
}
func automicValue(a) {
var config atomic.Value
wg := sync.WaitGroup{}
go func(a) {
for {
time.Sleep(time.Millisecond)
config.Store(loadConfig())
}
}()
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(a) {
c := config.Load().(Config)
fmt.Println(c)
wg.Done()
}()
}
wg.Wait()
}
Copy the code
Atomic.Value design and implementation
All of the atomic packages except for atomic.Value were written in assembly earlier, and the underlying implementation of the atomic.
Goroutine preemption
In Go, the scheduler is the GMP model, and G is simply goroutine; M, analogous to a kernel thread, is where G is executed; P is to schedule G and prepare resources for G’s execution. In general, the number of P CPU cores can also be specified by runtime.GOMAXPROCS.
Scheduling rule: A certain G cannot occupy M all the time. At a certain point, the Runtime determines whether M can be preempted, that is, G is being vacated on M. In runtime, each P maintains a local queue localq for G to be executed, and a global queue GlobalQ for G to be executed. Scheduling is when P takes G from localQ or GlobalQ and executes it on M. This is called preemption. The Runtime removes the preemption of G from the running state and copies the execution stack of G into a queue for execution, either localQ of P or GlobalQ, waiting for the next dispatch. Therefore, when the preempted G returns to the queue, it is possible that the P at this time is not the same as the P at the previous run.
Prohibit preemption: the current execution G is not allowed to be preempted until the prohibit preemption flag is lifted. The Go Runtime implements the disable preemption and undisable preemption of G.
/ / atomic Value source
type Value struct {
v interface{} // Any type of data can be stored
}
// Interface {} is an internal representation of the empty interface{} type
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}
// Fetch data is a normal process
func (v *Value) Load(a) (x interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// The first time I haven't written yet
return nil
}
// create a new interface{} and return it
data := LoadPointer(&vp.data)
xp := (*ifaceWords)(unsafe.Pointer(&x))
xp.typ = typ
xp.data = data
return
}
// Write data (how to ensure data integrity)
func (v *Value) Store(x interface{}) {
if x == nil {
panic("sync/atomic: store of nil value into Value")}// Bypass the Go type system and convert to any pointer type
vp := (*ifaceWords)(unsafe.Pointer(v)) / / the old value
xp := (*ifaceWords)(unsafe.Pointer(&x)) / / the new values
for { // Cooperate with CompareAndSwap to achieve optimistic lock effect
typ := LoadPointer(&vp.typ)
if typ == nil { // Write for the first time
runtime_procPin() // Prohibit preemption
if! CompareAndSwapPointer(&vp.typ,nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin() // If the lock is not grabbed, another thread has completed the assignment first and re-enters the loop
continue
}
// First assignment
StorePointer(&vp.data, xp.data)
StorePointer(&vp.typ, xp.typ)
runtime_procUnpin() // Write succeeded
return
}
if uintptr(typ) == ^uintptr(0) {
// The first write is not complete, continue to wait
continue
}
// Write the same type twice
iftyp ! = xp.typ {panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, xp.data)
return}}// Disable preemption, mark G is not preempt on M, and return the ID of P where it is currently located.
func runtime_procPin(a)
// Unpreempt G, then G can be preempt.
func runtime_procUnpin(a)
Copy the code
Refer to the article
Go Language Chinese
The history of atomic.Value in the Go language standard library
You don’t know the Go unsafe.Pointer uintptr principle and play
Understand the design and implementation of sync.pool in Go 1.13
How is the Go Slice maximum size derived
Golang coroutine scheduling mechanism and GOMAXPROCS performance tuning
Golang synchronization: atomic operation used