introduce

In languages with garbage collection features, there is a performance penalty when gc occurs. To reduce the impact of GC, it is common practice to reduce the number of memory requests for small objects and to have as few scan and clean active objects as possible each time a garbage collection occurs. Sync.pool helps build a Pool of objects in your application, provides object reuse capability, and is itself scalable and concurrency safe.

Major structurePoolExport two methods:GetPut.Get is used to Get available objects from the PoolIf the available object is empty, it passesNewPredefined func creates new objects.Put puts an object into a Pool for the next retrieval.

Get

func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } l, pid := p.pin() x := l.private l.private = nil if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. x, _ = l.shared.popHead() if x == nil { x = p.getSlow(pid) } } runtime_procUnpin() if race.Enabled { race.Enable() if x ! = nil { race.Acquire(poolRaceAddr(x)) } } if x == nil && p.New ! = nil { x = p.New() } return x }Copy the code

First take a look at the logic of the GET method (a general understanding of the GMP scheduling model is required before looking at this)

  • throughpingetpoolLocalRuns with the current Goroutine bindingPId. After each goroutine is created, it hangsPOn the structure; At run time, bindings are requiredPIn theMTo execute. Therefore, there is no need to lock poolLocal operations that point to private and are thread-safe
  • Set up thexAnd emptyprivate
  • xNull indicates that the local object is not set becausePThere are more than oneGIf a time-slice coroutine 1 retrieves the private object and then nullifes it, then the next time slice G2 retrieves it is nil. You need to goshareGets the header element from,shareIn multiplePBoth read and write are requiredlockBut it’s not locked, for reasons that we’ll talk about later
  • ifshareAlso returns null, callgetSlow()Function fetch, and we’ll look at the internal implementation
  • The runtime_procUnpin() method, which we’ll look at in more detail later
  • Finally, if no reusable object is found and setNewTo initialize a new object

The local field of Pool represents the poolLocal pointer. When fetching, first check whether the private domain is empty, and then read from the share, or steal from another P if empty, similar to goroutine scheduling mechanism.

pin

Let’s look at the last few questions in detail. First, the pin method gets the poolLocal of the current P. The logic of the method is relatively simple

func (p *Pool) pin() *poolLocal {
    pid := runtime_procPin()
    s := atomic.LoadUintptr(&p.localSize) // load-acquire
    l := p.local                          // load-consume
    if uintptr(pid) < s {
        return indexLocal(l, pid)
    }
    return p.pinSlow()
}
Copy the code

Runtime_procPin returns the current PID, see runtime for implementation details

//go:linkname sync_runtime_procPin sync.runtime_procPin //go:nosplit func sync_runtime_procPin() int { return procPin() } //go:linkname sync_runtime_procUnpin sync.runtime_procUnpin //go:nosplit func sync_runtime_procUnpin() { procUnpin() }  //go:nosplit func procPin() int { _g_ := getg() mp := _g_.m mp.locks++ return int(mp.p.ptr().id) } //go:nosplit func procUnpin() { _g_ := getg() _g_.m.locks-- }Copy the code
  • pinGet the address of the current goroutine, so that g corresponds tomIn the structurelocksField ++, returnspId.unPinIs tomthelocksField — why do I do that?

One of The Times when coroutines are scheduled: if a particular G consumes CPU resources for an extended period of time, a preemptive schedule occurs based on locks == 0. It’s essentially preventing preemption.

// One round of scheduler: Find a runnable goroutine and execute it. // Never returns. Func schedule() {_g_ := getg() if _g_.m.locks ! = 0 { throw("schedule: holding locks") } ... }Copy the code

Why disable scheduling? Because scheduling is to unbind M and P, and let P unbind other threads and execute code segments of other threads. In get, the first is to obtain the private p of the current Goroutine binding. If the scheduling is not prohibited, the subsequent fetch is not the p of the current coroutine runtime, which will pollute the data on other P and cause unknown errors.

poolChain

PoolChain is a double-ended linked list with the following structure:

type poolChain struct {
	head *poolChainElt
	tail *poolChainElt
}
Copy the code

poolChain.popHead

Poolchain-pophead is retrieved from poolDequeue’s popHead method. If not, prev is found and nil is returned.

func (c *poolChain) popHead() (interface{}, bool) { d := c.head for d ! = nil { if val, ok := d.popHead(); ok { return val, ok } // There may still be unconsumed elements in the // previous dequeue, so try backing up. d = loadPoolChainElt(&d.prev) } return nil, false }Copy the code

So let’s make a distinction herepoolChainandpoolDequeueThere are methods with the same name for both structures, but the structure and logic are completely different

type poolChain struct {
	// head is the poolDequeue to push to. This is only accessed
	// by the producer, so doesn't need to be synchronized.
	head *poolChainElt

	// tail is the poolDequeue to popTail from. This is accessed
	// by consumers, so reads and writes must be atomic.
	tail *poolChainElt
}
type poolChainElt struct {
	poolDequeue
    next, prev *poolChainElt
}
type poolDequeue struct {
    headTail uint64
    vals []eface
}
Copy the code

PoolChainElt is a list that goes in the opposite direction from head -> tail to prev and vice versa. PoolDequeue is a circular linked list. The headTail field holds the addresses of head and tail, where the higher 32 bits represent head and the lower 32 bits represent tail.

poolDequeue.popHead

func (d *poolDequeue) popHead() (interface{}, bool) {
    var slot *eface
    for {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs)
        if tail == head {
            return nil, false
        }
        head--
        ptrs2 := d.pack(head, tail)
        if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
            slot = &d.vals[head&uint32(len(d.vals)-1)]
            break
        }
    }

    val := *(*interface{})(unsafe.Pointer(slot))
    if val == dequeueNil(nil) {
        val = nil
    }
    *slot = eface{}
    return val, true
}
Copy the code
  • seeif tail == head If the first address is the same, the linked list is emptypoolDequeueIt’s really a circular list;
  • head--afterpack(head, tail)Get the new address ptrs2, if PTRS == ptrs2, modifyheadTailAddress;
  • Convert slot to value of type interface{};

getSlow

If you don’t get an edible object from the shared popHead, you need to get it from getSlow

func (p *Pool) getSlow(pid int) interface{} { size := atomic.LoadUintptr(&p.localSize) // load-acquire locals := p.local // load-consume // locals, from the tail of other P for I := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) if x, _ := l.shared.popTail(); x ! = nil { return x } } size = atomic.LoadUintptr(&p.victimSize) if uintptr(pid) >= size { return nil } // Try from the victim to poolLocal, according to the first private - > Shared order for locals = p.v ictim l: = indexLocal (locals, pid) if x: = l.p rivate; x ! = nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x ! = nil { return x } } atomic.StoreUintptr(&p.victimSize, 0) return nil }Copy the code

Get the object from locals, using the []poolLocal pointed to by the victim field. This is actually a reference to a mechanism called Victim Cache, which is explained here.

poolChain.popTail

func (c *poolChain) popTail() (interface{}, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			return nil, false
		}
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}
Copy the code
  • d2isdthenextNode,dWe’re already at the end of the list, and this also confirms what we were talking aboutpoolChainThe end of a linked list is the opposite of a normal list (I don’t know why). ifd2Null indicates that it has reached the head of the list, so it returns directly;
  • If the tail node gets successfully, it will be returned directly. The returned position will wait for the next GET to be deleted. Because it is stolen from other P, objects may be acquired by multiple coroutines at the same time, so concurrency security needs to be ensured.
  • whypopHeadThere are two reasons for not deleting a linked list node. First, popHead only the current coroutine operates on its own P, popTail is stolen if inpopHeadThe author wants to minimize the overhead of the GET phase. Second, becausepoolChainStructure itself is a linked list, no matter which step to do the result is the same, it is better to unify the tail to delete when obtaining.

poolDequeue.popTail

func (d *poolDequeue) popTail() (interface{}, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			return nil, false
		}
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

	val := *(*interface{})(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)

	return val, true
}
Copy the code

The poolDequeue. PopHead method has the same logic as the poolDequeue. PopHead method.

Put

func (p *Pool) Put(x interface{}) { if x == nil { return } if race.Enabled { if fastrand()%4 == 0 { // Randomly drop x on floor. return } race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } l, _ := p.pin() if l.private == nil { l.private = x x = nil } if x ! = nil { l.shared.pushHead(x) } runtime_procUnpin() if race.Enabled { race.Enable() } }Copy the code

Set poolLocal to private, and if private already exists, write to shared.pushHead.

poolChain.pushHead

Func (c *poolChain) pushHead(val interface{}) {d := c. ead if d == nil {// initialize ring, Const initSize = 8 d = new(poolChainElt) d.als = make([]eface, initSize) c.read = d storePoolChainElt(& c.stitch, D)} if d.bushhead (val) {return} // If the ring is full, create a new ring with double the size. NewSize := len(d.als) * 2 if newSize >= dequeueLimit {// Can't make it any bigger. NewSize = dequeueLimit } d2 := &poolChainElt{prev: d} d2.vals = make([]eface, newSize) c.head = d2 storePoolChainElt(&d.next, d2) d2.pushHead(val) }Copy the code

If the node is empty, a new poolChainElt object is created as the head node, and pushHead is called to put it into the circular queue. If the placement fails, create a poolChainElt node that is twice the size and not larger than dequeueLimit (2 ^ 30). All vals must be an integer power of 2.

func (d *poolDequeue) pushHead(val interface{}) bool { ptrs := atomic.LoadUint64(&d.headTail) head, tail := d.unpack(ptrs) if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { return false } slot := &d.vals[head&uint32(len(d.vals)-1)] typ := atomic.LoadPointer(&slot.typ) if typ ! = nil { return false } if val == nil { val = dequeueNil(nil) } *(*interface{})(unsafe.Pointer(slot)) = val atomic.AddUint64(&d.headTail, 1<<dequeueBits) return true }Copy the code

Check whether the ring is full, and then find the slot corresponding to the head position to check whether TYp is empty, because popTail sets val first and then typ to nil. If there is a conflict, it will return directly.

Conclusion:

The entire object pool is made up of several main structures, which are related as follows:

poolCleanup

Func for global cleanup is registered to run at the start of each GC. Since every GC cleans up objects in the pool, what’s the advantage of object reuse? The poolCleanup writes allPools objects to the oldPools object and then cleans up its own objects at each GC. In other words, if the object is applied, it will go through two gc before it is completely reclaimed. – P.local will be set to P.victim first, isn’t it similar to the feeling of new generation and old generation?

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}
func poolCleanup() {
    for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// Move primary cache to victim cache.
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	oldPools, allPools = allPools, nil
}
Copy the code

As you can see, in scenarios where GC occurs infrequently, sync.pool object reuse can reduce frequent memory requisition and reclamation.

References

  • Mp.weixin.qq.com/s?__biz=MzA…
  • medium.com/@genchilu/w…