In the Go language standard library, the Sync/Atomic package encapsulates atomic memory operations provided by the underlying hardware as Go functions.

Mutex is implemented by the operating system, while atomic operations in the atomic package are directly supported by the underlying hardware. In the instruction set implemented by CPU, some instructions are directly encapsulated into atomic package, and these instructions are not allowed to be interrupted during execution. Therefore, atomic operation can ensure concurrency safety in lock-free condition, and its performance can be linearly expanded with the increase of the number of cpus.

The data type

  • int32
  • int64
  • uint32
  • uint64
  • uintptr
  • unsafe.Pointer

Operation type

  • Plus or minus AddXXX

    *addr += delta
    return *addr
    Copy the code
  • Compare and exchange CompareAndSwapXXX

    if *addr == old {
    	*addr = new
    	return true
    }
    return false
    Copy the code
  • Load the LoadXXX

  • return *addr
    Copy the code
  • Storage StoreXXX

    *addr = val
    Copy the code
  • Exchange SwapXXX

    old = *addr
    *addr = new
    return old
    Copy the code

What operations are called atomic operations?

The ability of one or more operations to remain uninterrupted during CPU execution is called atomicity. These operations appear to the outside world as an integral whole, and they are either all performed or none performed. The outside world does not see that they are only half-performed. In the real world, cpus cannot perform a series of operations without interruption, but if we can make their intermediate states invisible while performing multiple operations, we can claim that they have “indivisible” atomicity.

In Go, a normal assignment statement is not really an atomic operation. For example, writing a variable of type INT64 on a 32-bit machine will have an intermediate state because it will be split into two writes (MOV) — write the lower 32 bits and write the higher 32 bits.

How about a lock?

Atomic operations are supported by the underlying hardware, while locking is implemented by the operating system’s scheduler. Locks should be used to protect a piece of logic. Atomic operations are usually more efficient for protecting a variable update and take advantage of multiple cores in a computer. If you want to update a composite object, use an implementation wrapped in atomic.value.

Value type operation

  • If one thread writes the lower 32 bits and another thread reads the variable before writing the higher 32 bits, the result is a nonlogical intermediate variable, which can cause weird bugs in the program.
// The CAS operation is not so easy to succeed when the value being operated on is frequently changed
// Use the for loop for multiple attempts
var value int32

func addValue1(delta int32){
	for{
		// The value can be read and written simultaneously
		// Then the read operation will probably read a half-modified data
		v := value
		if atomic.CompareAndSwapInt32(&value, v, v + delta){
			break}}}Copy the code
  • Use the Load function to prevent half-valid data from being read
func addValue2(delta int32){
    for{
    	// Use load
		v := atomic.LoadInt32(&value)
        if atomic.CompareAndSwapInt32(&value, v, v + delta){
            // Exit the loop when the result of the function is true
            break}}}Copy the code

Struct type operation

  • The probability of problems is higher if you assign to a structure directly. The reader reads the variable as soon as the thread has written a part of the fields, so it can only read part of the modified values, which destroys the integrity of the variable and the values it reads are completely wrong.
  • Faced with the problem of reading and writing variables in multiple threads, version 1.4atomic.ValueCome on, it allows us to not rely on incompatibleunsafe.PointerType, while encapsulating reads and writes of arbitrary data types as atomic operations (making intermediate states invisible).
// An example

type Config struct {
	Addr string
	Port string
}

func (c Config) String(a) string {
	return c.Addr + ":" + c.Port
}

func loadConfig(a) Config {
	// do something
	return Config{}
}
func automicValue(a) {
	var config atomic.Value
	wg := sync.WaitGroup{}
	go func(a) {
		for {
			time.Sleep(time.Millisecond)
			config.Store(loadConfig())
		}
	}()

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go func(a) {
			c := config.Load().(Config)
			fmt.Println(c)
			wg.Done()
		}()
	}
	wg.Wait()
}
Copy the code

Atomic.Value design and implementation

All of the atomic packages except for atomic.Value were written in assembly earlier, and the underlying implementation of the atomic.

Goroutine preemption

In Go, the scheduler is the GMP model, and G is simply goroutine; M, analogous to a kernel thread, is where G is executed; P is to schedule G and prepare resources for G’s execution. In general, the number of P CPU cores can also be specified by runtime.GOMAXPROCS.

Scheduling rule: A certain G cannot occupy M all the time. At a certain point, the Runtime determines whether M can be preempted, that is, G is being vacated on M. In runtime, each P maintains a local queue localq for G to be executed, and a global queue GlobalQ for G to be executed. Scheduling is when P takes G from localQ or GlobalQ and executes it on M. This is called preemption. The Runtime removes the preemption of G from the running state and copies the execution stack of G into a queue for execution, either localQ of P or GlobalQ, waiting for the next dispatch. Therefore, when the preempted G returns to the queue, it is possible that the P at this time is not the same as the P at the previous run.

Prohibit preemption: the current execution G is not allowed to be preempted until the prohibit preemption flag is lifted. The Go Runtime implements the disable preemption and undisable preemption of G.

/ / atomic Value source

type Value struct {
	v interface{} // Any type of data can be stored
}

// Interface {} is an internal representation of the empty interface{} type
type ifaceWords struct {
	typ  unsafe.Pointer
	data unsafe.Pointer
}

// Fetch data is a normal process
func (v *Value) Load(a) (x interface{}) {
	vp := (*ifaceWords)(unsafe.Pointer(v))
	typ := LoadPointer(&vp.typ)
	if typ == nil || uintptr(typ) == ^uintptr(0) {
		// The first time I haven't written yet
		return nil
	}
  // create a new interface{} and return it
	data := LoadPointer(&vp.data)
	xp := (*ifaceWords)(unsafe.Pointer(&x))
	xp.typ = typ
	xp.data = data
	return
}

// Write data (how to ensure data integrity)
func (v *Value) Store(x interface{}) {
	if x == nil {
		panic("sync/atomic: store of nil value into Value")}// Bypass the Go type system and convert to any pointer type
	vp := (*ifaceWords)(unsafe.Pointer(v)) / / the old value
	xp := (*ifaceWords)(unsafe.Pointer(&x)) / / the new values
	for { // Cooperate with CompareAndSwap to achieve optimistic lock effect
		typ := LoadPointer(&vp.typ)
		if typ == nil { // Write for the first time
			runtime_procPin() // Prohibit preemption
			if! CompareAndSwapPointer(&vp.typ,nil, unsafe.Pointer(^uintptr(0))) {
				runtime_procUnpin() // If the lock is not grabbed, another thread has completed the assignment first and re-enters the loop
				continue
			}
			// First assignment
			StorePointer(&vp.data, xp.data)
			StorePointer(&vp.typ, xp.typ)
			runtime_procUnpin() // Write succeeded
			return
		}
		if uintptr(typ) == ^uintptr(0) {
			// The first write is not complete, continue to wait
			continue
		}
		// Write the same type twice
		iftyp ! = xp.typ {panic("sync/atomic: store of inconsistently typed value into Value")
		}
		StorePointer(&vp.data, xp.data)
		return}}// Disable preemption, mark G is not preempt on M, and return the ID of P where it is currently located.
func runtime_procPin(a)
// Unpreempt G, then G can be preempt.
func runtime_procUnpin(a)

Copy the code

Refer to the article

Go Language Chinese

The history of atomic.Value in the Go language standard library

You don’t know the Go unsafe.Pointer uintptr principle and play

Understand the design and implementation of sync.pool in Go 1.13

How is the Go Slice maximum size derived

Golang coroutine scheduling mechanism and GOMAXPROCS performance tuning

Golang synchronization: atomic operation used