Do you feel that Go’s Sync package is not enough? Have you encountered any types that are not supported by sync/atomic?

Let’s take a look at some of the value-added additions to the standard library from Go-Zero’s SyncX package.

Github.com/tal-tech/go…

name role
AtomicBool Bool atomic class
AtomicDuration Duration is about atomic classes
AtomicFloat64 Float64 type atomic class
Barrier Fence [lock to unlock the package]
Cond Condition variables,
DoneChan Graceful notification off
ImmutableResource A resource that will not be modified after being created
Limit Control number of requests
LockedCalls Ensure that methods are called serially
ManagedResource Resource management
Once provideonce func
OnceGuard One-off resource management
Pool Pool: a simple pool
RefResource Reference counted resources
ResourceManager Resource manager
SharedCalls similarsingflightThe function of the
SpinLock Spin lock: spin +CAS
TimeoutLimit Limit + timeout Control

The above library components are described separately.

atomic

Because there is no generic support, there are many types of atomic class support. The following uses FLOAT64 as an example:

func (f *AtomicFloat64) Add(val float64) float64 {
	for {
		old := f.Load()
		nv := old + val
		if f.CompareAndSwap(old, nv) {
			return nv
		}
	}
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
	return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load(a) float64 {
	return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {
	atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
Copy the code
  • Add(val) : If CAS fails, retry the for loop, get old val, and set old+val;

  • CompareAndSwap(old, new) : call the CAS of the underlying atomic;

  • Load() : Call atomy.loaduint64 and convert

  • Set(val) : call atomy.storeuint64

As for other types, developers who want to extend the type they want can do as described above, basically calling the original atomic operation and converting it to the desired type. For example, if a bool is encountered, you can distinguish false and true by 0 or 1.

Barrier

Barrier only encapsulates business function operations and is passed in as a closure. Internal lock operations are unlocked automatically.

func (b *Barrier) Guard(fn func(a)) {
	b.lock.Lock()
	defer b.lock.Unlock()
  // Own business logic
	fn()
}
Copy the code

Cond/Limit/TimeoutLimit

This data structure, together with the Limit, makes up the TimeoutLimit. Here are the three together:

func NewTimeoutLimit(n int) TimeoutLimit {
	return TimeoutLimit{
		limit: NewLimit(n),
		cond:  NewCond(),
	}
}

func NewLimit(n int) Limit {
	return Limit{
		pool: make(chan lang.PlaceholderType, n),
	}
}
Copy the code
  • limitThere is a buffer herechannel;
  • condIt is unbuffered;

Limit limits the use of a resource, so you need to add a preset number of resources to the resource pool. Conds are valves that require both sides to be ready for data exchange, so no buffering, synchronous control is used.

Here we look at session management in Stores/Mongo to understand resource control:

func (cs *concurrentSession) takeSession(opts ... Option) (*mgo.Session, error) {
  // Option parameter injection.// Check to see if the limit can be retrieved
	iferr := cs.limit.Borrow(o.timeout); err ! =nil {
		return nil, err
	} else {
		return cs.Copy(), nil}}func (l TimeoutLimit) Borrow(timeout time.Duration) error {
  // 1. If there are any resources in limit, fetch one and return it
	if l.TryBorrow() {
		return nil
	}
	// 2. If limit is used up
	var ok bool
	for {
    // Select * from (cond <- cond);
		timeout, ok = l.cond.WaitWithTimeout(timeout)
    // Try to fetch a resource.
    / / see ` Return ` ()
		if ok && l.TryBorrow() {
			return nil
		}
		// Timeout control
		if timeout <= 0 {
			return ErrTimeout
		}
	}
}

func (l TimeoutLimit) Return(a) error {
  // Returns a resource
	iferr := l.limit.Return(); err ! =nil {
		return err
	}
	// Synchronously notify another coroutine that needs resources
	l.cond.Signal()
	return nil
}
Copy the code

Resource management

The same folder also contains ResourceManager, which has a similar name. This section describes the two components together.

First, from the structure:

type ManagedResource struct {
  / / resources
	resource interface{}
	lock     sync.RWMutex
  // The logic for generating resources is controlled by the developer
	generate func(a) interface{}
  // Compare resources
	equals   func(a, b interface{}) bool
}

type ResourceManager struct {
  I/O = I/O
	resources   map[string]io.Closer
	sharedCalls SharedCalls
  // Mutually exclusive access to resource map
	lock        sync.RWMutex
}
Copy the code

Then look at the method signature to get the resource:

func (manager *ResourceManager) GetResource(key, create func(a) (io.Closer, error)) (io.Closer, error)

// Get a resource (if there is one, get one)
func (mr *ManagedResource) Take(a) interface{}
// Determine if this resource does not meet the judgment requirements passed in, reset if it does not
func (mr *ManagedResource) MarkBroken(resource interface{})
Copy the code
  1. ResourceManager uses SharedCalls to prevent repeat requests and caches resources to internal sourMap. In addition, the create func passed in is related to IO operations and is commonly used for caching network resources.

  2. ManagedResource Caches a resource with a single interface instead of a map, but it provides a Take() and a pass in generate() specification that allows developers to update the resource themselves;

So in terms of use:

  • ResourceManager: Used to manage network resources. Such as: database connection management;
  • ManagedResource: Used in some changing resources, you can make a comparison of resources before and after, to achieve the update of resources. Such as:tokenManagement and validation

RefResource

This is similar to reference counting in GC:

  • Use() -> ref++
  • Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use(a) error {
  // Mutually exclusive access
	r.lock.Lock()
	defer r.lock.Unlock()
	// Clear the flag
	if r.cleaned {
		return ErrUseOfCleaned
	}
	/ / reference + 1
	r.ref++
	return nil
}
Copy the code

SharedCalls

Using SharedCalls allows multiple requests to make a single call with the result, allowing the rest of the requests to “sit idle”. This design reduces the concurrency of resource services and prevents cache breakdowns.

This component is repeatedly applied to other components, ResourceManager.

Similarly, when a resource needs to be accessed concurrently at high frequency, the SharedCalls cache can be used.

// When multiple requests request resources simultaneously using the Do method
func (g *sharedGroup) Do(key string, fn func(a) (interface{}, error)) (interface{}, error) {
  // Apply the lock first
  g.lock.Lock()

  // Obtain the corresponding call result according to the key and store it in the variable c
  if c, ok := g.calls[key]; ok {
    // After the call is received, the lock is released, where the call may have no actual data, just an empty memory placeholder
    g.lock.Unlock()
    // Call WG.wait to see if any other Goroutine is applying for resources. If blocked, another Goroutine is applying for resources
    c.wg.Wait()
    // When Wg.wait is no longer blocked, the resource retrieval is complete and the result can be returned directly
    return c.val, c.err
  }

  // If the result is not available, call makeCall. Note that this is still locked, ensuring that only one goroutine can call makeCall
  c := g.makeCall(key, fn)
  // returns the result of the call
  return c.val, c.err
}
Copy the code

conclusion

One of the main themes of GO-Zero’s design has always been not to repeat the wheel; It also precipitates the usual business into components, which is what frameworks and components are all about.

Stay tuned for more design and implementation articles on Go-Zero. Welcome to pay attention to and use.

The project address

Github.com/tal-tech/go…

Welcome to Go-Zero and star support us!

Wechat communication group

Follow the “micro service practice” public account and reply to the group to obtain the community qr code.