Last year, I talked with Dong da about the timer of Go. At that time, I probably noticed that the code was quad heap, but I neglected the implementation details.
A few days ago and Liu Ding and Dong Da talked about the timer thing, just take this opportunity to Go timer complete comb again. Netpoll is not involved, because I haven’t looked through the code yet.
Dong da article network and how to combine timer are mentioned, interested can see here.
Do not be surprised to see the same picture on both sides, so this article can be counted as linkage, haha.
Timer is a very important function of the server program, especially the system that deals with the network or file system. We will add a reasonable timeout for each IO operation to avoid resource leakage under various specific circumstances. So it’s important to understand how timers work.
PS: It seems that Ghost’s support for Unicode string diagrams is not particularly good either. If you’re feeling uncomfortable, check out the original article, here. If you have inspiration, welcome to give me a star~
The timer
Simple usage
Date, Parse, ParseInLocation. Focus on the following functions:
Ticker related
func Tick(d Duration) <-chan Time
func NewTicker(d Duration) *Ticker
func (t *Ticker) Stop()
Copy the code
package main
import (
"fmt"
"time"
)
func main() {
for t := range time.Tick(time.Second * 2) {
fmt.Println(t, "hello world")
}
}
Copy the code
package main
import (
"fmt"
"time"
)
func main() {
ticker := time.NewTicker(time.Second * 2)
for {
select {
case t := <-ticker.C:
fmt.Println(t, "hello world")
}
}
}
Copy the code
Note that ticker should be stopped manually when not in use. Failure to stop may cause timer leaks, as shown in the following code:
package main
import (
"fmt"
"time"
)
func main() {
for {
select {
case t := <-time.Tick(time.Second * 2):
fmt.Println(t, "hello world")
}
}
}
Copy the code
Leaks can lead to more and more timer objects accumulating in the time heap, causing more trouble.
The timer related
func After(d Duration) <-chan Time
func NewTimer(d Duration) *Timer
func (t *Timer) Reset(d Duration) bool
func (t *Timer) Stop() bool
Copy the code
Time. After is generally used to control certain actions that take a long time, and not to wait After a timeout, so that the program behavior is predictable. If you do not release resources by timeout cancellation, local resources may pile up due to slow response from dependent parties, such as FDS, connection counts, memory usage, and so on. As a result, the service is down.
This simulates long behavior with the property that blocking channel reads will block forever, jumping out of select after timeout.
package main
import "time"
func main() {
var ch chan int
select {
case <-time.After(time.Second):
println("time out, and end")
case <-ch:
}
}
Copy the code
Time. After, unlike time.Tick, is fired once, After which the timer itself is deleted from the time heap. < time-after: < time-time. After: < time-time. After: < time-time. After: < time-time. After: < time-time. After: < time-time. After: < time-time. After: < time-time. After: < time-time. After:
package main
import "time"
func main() {
var ch = make(chan int)
go func() {
for {
ch <- 1
}
}()
for {
select {
case <-time.After(time.Second):
println("time out, and end")
case <-ch:
}
}
}
Copy the code
In the code above, the <-ch case is executed for a short time each time, but each time a select is entered, time.After assigns a new timer. Therefore, a large number of useless timers will be created in a short period of time. Although useless timers will disappear after being triggered, this writing method will cause a meaningless waste of CPU resources. The correct way to write a timer is to reuse it as follows:
package main
import "time"
func main() {
var ch = make(chan int)
go func() {
for {
ch <- 1
}
}()
timer := time.NewTimer(time.Second)
for {
timer.Reset(time.Second)
select {
case <-timer.C:
println("time out, and end")
case <-ch:
}
}
}
Copy the code
As with the Ticker, if the previous timer is no longer useful, you can manually Stop the timer to remove it from the time heap.
Source code analysis
The data structure
+--------+ | timers | +----+---++----+----+----+----+----+-----------------------+----+ | | | | | | | | | | | 0 | 1 | 2 | 3 | 4 5 6 | | |... | | + 63 - + - + - + - + - + - + - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + - + | | | | | | | | | | | | | | | | | | +--------------------+ | | | +--------------------+ | | |--------> | cacheline size * N | <--------+ | |--------> | cacheline size * N | <--------+ | | +--------------------+ | | | +--------------------+ | | +-------------+-----------------+----------+ | +-------------+-----------------+----------+ +>|timersBucket | | | +--->|timersBucket | | | +-------------+-----------------+ pad | +-------------+-----------------+ pad | | lock mutex | | | lock mutex | | +-------------------------------+ | +-------------------------------+ | | gp *g | | | gp *g | | +-------------------------------+ | +-------------------------------+ | | created bool | | | created bool | | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + |...... +-------------------------------+ | | sleeping bool | | | sleeping bool | | +-------------------------------+ | +-------------------------------+ | | rescheduling bool | | | rescheduling bool | | +-------------------------------+ | +-------------------------------+ | | sleepUntil int64 | | | sleepUntil int64 | | +-------------------------------+ | +-------------------------------+ | | waitnote note | | | waitnote note | | +-------------------------------+ | +-------------------------------+ | | t []*timer | | | t []*timer | | +-------------------------------+----------+ +-------------------------------+----------+ | | | v +---+ | 0 | +---+ | | | | v +---+---+---+---+ | 1 | 2 | 3 | 4 | +-+-+-+-+-+-+-+-+ +---------------------------------+ | | +------------------------------------------+ | +------+ +--------+ | v | | v +---+---+---+---+ v v +---+---+---+---+ | | | | | +---+---+---+---+ +---+---+---+---+ | | | | | +-+-+-+-+---+---+ | | | | | | | | | | +---+---+-+-+-+-+ | | +---+---+---+---+ +---+---+---+---+ | | +---------+ +---------+ +---------+ +-----+ | | | | | | | | v v v v +---+---+---+---+ +---+---+---+---+ +---+---+---+---+ + - + - + - + - + | | | | | | | | | |... | | | | | | | | | | +---+---+---+---+ +---+---+---+---+ +---+---+---+---+ +---+---+---+---+Copy the code
The following conclusions can be viewed in conjunction with the picture above.
Timers array is defined in runtime/time.go:
var timers [timersLen]struct {
timersBucket
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}
Copy the code
In early implementations of Go, there was a global timer, but the global timer heap had to be locked, so multiple cores exposed poor performance due to lock contention. As of some version (well, I don’t know which one, maybe 1.10), Go timers have been modified to implement this multiple time heap implementation and are currently deadlocked to 64 in runtime:
const timersLen = 64
Copy the code
If it is equal to GOMAXPROCS, the time heap will be reallocated and modified at procreSize. Writing to 64 is a tradeoff between memory usage and performance. If GOMAXPROCS is larger than 64, then it is possible that multiple Ps will share the same time heap. Of course, I’ve never seen a CPU with more than 64 cores in the real world.
The element of the Timers array is an anonymous struct containing the timersBucket and pad members, which are intended to fill multiples of the struct to the cacheline to avoid false sharing between different ps. This is common in multi-core programming scenarios. Structure of timerBucket:
//go:notinheap type timersBucket struct { lock mutex gp *g created bool sleeping bool rescheduling bool sleepUntil int64 waitnote note t []*timer }Copy the code
T is our time heap, but this is slightly different from our traditional heap structure, which is divided into four forks. This design is first seen. TimersBucket also has a special comment go:notinheap.
go:notinheap applies to type declarations. It indicates that a type must never be allocated from the GC’d heap. Specifically, pointers to this type must always fail the runtime.inheap check. The type may be used for global variables, for stack variables, or for objects in unmanaged memory (e.g., allocated with sysAlloc, persistentalloc, fixalloc, or from a manually-managed span). Specifically:
- new(T), make([]T), append([]T, …) and implicit heap allocation of T are disallowed. (Though implicit allocations are disallowed in the runtime anyway.)
- A pointer to a regular type (other than unsafe.Pointer) cannot be converted to a pointer to a go:notinheap type, even if they have the same underlying type.
- Any type that contains a go:notinheap type is itself go:notinheap. Structs and arrays are go:notinheap if their elements are. Maps and channels of go:notinheap types are disallowed. To keep things explicit, any type declaration where the type is implicitly go:notinheap must be explicitly marked go:notinheap as well.
- Write barriers on pointers to go:notinheap types can be omitted. The last point is the real benefit of go:notinheap. The runtime uses it for low-level internal structures to avoid memory barriers in the scheduler and the memory allocator where they are illegal or simply inefficient. This mechanism is reasonably safe and does not compromise the readability of the runtime.
Well, you just need to know about it, you’re not going to use it in user code.
Four fork small top heap properties
The quadheap is shorter than the binary heap in height. All (up to four) children of a node are larger than this node. A node’s (only one) parent must be smaller than the current node. Here is a typical quadheap with the values filled in:
+-----+ | | | 0 | +-----+ | | | v +-----+-----+-----+-----+ | | | | | | 3 | 2 | 2 | 10 | +-----+-----+-----+-----+ | | | | | | | | +----------+ | | | | +----------------+ 4*i+1 +-----------------------+ | | +-----------------------------+ | +----------+ +-------------------+ +---+ | | | | | | | | | v | | v +-----+-----+-----+-----+ | | +-----+-----+-----+-----+ | | | | | v v | | | | | | 20 | 4 | 5 | 13 | +-----+-----+-----+-----+ + + -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- + | 99 | | | | 11 12 13 + + -- -- -- -- -- -- -- -- -- - + + -- -- -- -- -- -- -- -- -- -- + | | | | | | | | | | + -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- - + - + 12 14 15 16 | | | | | | 3 | 3 | 3 | | 10 + + -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- + + + -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- +Copy the code
As with binary heaps, the only requirement for a node is the size relationship between its parent and its children. There is no relationship between adjacent nodes.
Time heap insertion
// Allocate timerBucket // lock, Func addTimer (t *timer) {TB := t.assignbucket () lock(&tb.lock) tb.addTimerLocked (t) unlock(&tb.lock)} // It's so easy, TimerBucket func (t *timer) assignBucket() *timersBucket {id := Uint8 (getg().m.p.ptr().id) % timersLen t.tb = &timers[id].timersbucket return t.tb} If the time heap is initialized for the first time or the current timer is older than all previous timers, Func (TB *timersBucket) addtimerLocked(t *timer) {// When must be greater than 0, If t.hen < 0 {t.hen = 1<< 63-1} t.i = len(tb.t) t.t = append(tb.t, t) siftupTimer(tb.t, T.i. == 0 {// The new timer is inserted earlier than all the previous ones // Adjust the heap upward if tb.sleeping {// Change the sleep state of timerBucket tb.sleeping = false // Wake up timerproc // so that the for loop in timerProc no longer blocks on notesleepg notewakeUp (&tb.waitnote)} // all timers on the same P if // are present Timerproc will pop // that rescheduling will be marked as true // and the goroutine that started timerProc will be goparkunlock if tb.rescheduling {// This flag is set to false here and in timejumpLocked tb.rescheduling = false goReady (tb.gp, 0)}} Start a Goroutine // to loop out the time heap, and the internal timer will be triggered according to the need for the earliest time // corresponding time sleep if! tb.created { tb.created = true go timerproc(tb) } }Copy the code
The logic for inserting a timer into the heap is to append to the end of the array (Append) and then adjust(siftup) heap to restore the quad small top heap nature.
Time heap deletion
Func delTimer (t *timer) bool {if t.tb == nil {// t.tb can be nil if the user created a timer // directly, without invoking startTimer e.g // time.Ticker{C: c} // In this case, return early without any deletion. // See Issue 21874. return false } tb := t.tb lock(&tb.lock) // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i last := len(tb.t) - 1 if i < 0 || i > last || tb.t[i] ! = t {unlock(&tb.lock) return false} timer[I] = timer[last] if I! = last {tb.t[I] = tb.t[last] tb.t[I]. T [:last] = nil tb.t = tb.t[:last] = last {// The current fork of the last node may not be its fork // so it is possible to go up or down // even for binary heap, T, I) siftdownTimer(Tb.t, I)} UNLOCK (&tb.lock) return true}Copy the code
The timer is triggered
// TimerProc handles time-driven events. // It will sleep until the next event in the heap needs to be triggered. // If addTimer inserts an earlier event, Timerproc func timerProc (TB *timersBucket) {tb.gp = getg() for {// timerBucket partial lock(&tb.lock) // timerBucket partial lock(&tb.lock) Tb.sleeping = false // timing now := nanotime() delta := int64(-1) // Before processing expired timer, Keep loop for {// If the timer has already popped // then don't loop, If len(tb.t) == 0 {delta = -1 break} // Take the top element of the small heap (t := tb.t[0] delta = t.hen -now // If delta > 0 {// If delta is greater than 0, this timer is not ready to trigger // Break the loop and go to sleep break} if t.period > 0 {// This timer T.hen += t.period * (1 + -delta/t.period) siftdownTimer(tb.t, 0)} else {// Remove the timer from the heap // override the 0th timer with the last timer // then adjust the heap downward := len(tb.t) -1 if last > 0 {tb.t[0] = tb.t[last] tb.t[0].i = 0 } tb.t[last] = nil tb.t = tb.t[:last] if last > 0 { siftdownTimer(tb.t, 0)} t.i = -1 // Mark the position of the timer in the heap has been lost} // The function that needs to be called when the timer is triggered f := t.jar := t.argseq := t.eq unlock(&tb.lock) // Call f(arg, seq) // add the lock back If the next break the inner for loop / / guarantee timeBucket is locked / / and then is to unlock the lock in the goparkunlock below (& TB. Lock)} if the delta < 0 | | faketime > 0 {// There is no timer in the time heap. Tb.rescheduling = true goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1) continue} // Indicate that there is at least one more timer in the heap // Sleeping = true tb.sleepuntil = now + delta Noteclear (&tb.waitnote) unlock(&tb.lock) // Inside is futex sleep // Will wake up notetSleepg (&tb.waitnote, delta)}}Copy the code
If you look at the period here, a timer with a period starts when, and every period, it fires again.
when+period
when+period*3
|
| |
| |
| when+period*2|
when | |
| | |
| | | | .....
| | | |
+-----------+ | | | |
| timeline +------------------------+-------+-------+-------+----------------->
+-----------+ | | | |
v v v v
trigger trigger
trigger trigger
Copy the code
Time stack adjustment
As the previous code saw, the time heap can be adjusted up or down.
Adjusted upward
func siftupTimer(t []*timer, When := t[I]. When TMP := t[I] // Start with the parent of the newly inserted node // If the newly inserted node is triggered earlier than the parent node // For I > 0 {p := (i-1) / 4 // parent when >= t[p]. When {break} t[I] = t[p] t[I]. I = I I = p} // If a move has occurred, override the last parent that moved down with the newly inserted node if TMP! = t[i] { t[i] = tmp t[i].i = i } }Copy the code
Downward adjustments
func siftdownTimer(t []*timer, I int) {n := len(t) when := t[I]. When TMP := t[I] for {c := I *4 + 1 // c3 := c + 2 // third child if c >= n {break } w := t[c].when if c+1 < n && t[c+1].when < w { w = t[c+1].when c++ } if c3 < n { w3 := t[c3].when if c3+1 < n && t[c3+1].when < w3 { w3 = t[c3+1].when c3++ } if w3 < w { w = w3 c = c3 } } if w >= when { break } t[i] = t[c] t[i].i = i i = c } if tmp ! = t[i] { t[i] = tmp t[i].i = i } }Copy the code
This code is not very elegant. It simply finds the smallest of all the child nodes and breaks if the smallest is larger than the node being moved down. Otherwise, the smallest node is moved up, and then determine whether the four children of the smallest node are all larger than the node to be moved down. And so on. Here’s a diagram to simulate the process:
| +---+ | |*5 | | +---+ | | | +-----+ | v | +---+---+---+---+ | | 7 | 3 |*2 | 6 | | +---+---+---+---+ +-------------------+ | | | siftdownTimer | | +----------+ +-------------------+ | v .---------. | +---+---+---+---+ ( before ) | | 4 | 5 | 9 |*3 | `---------' | +---+---+---+---+ | | | +-------------+ | v | +---+---+---+---+ | | 6 | 6 | 6 |*4 | | +---+---+---+---+ | v | +---+ | |*2 | | +---+ | | | +-----+ | v | +---+---+---+---+ | | 7 | 3 |*3 | 6 | | +---+---+---+---+ +-------------------+ | | | siftdownTimer | | +----------+ +-------------------+ | v .---------. | +---+---+---+---+ ( after ) | | 4 | 5 | 9 |*4 | `---------' | +---+---+---+---+ | | | +-------------+ | v | +---+---+---+---+ | | 6 | 6 | 6 |*5 | | +---+---+---+---+ | vCopy the code
process
The timer. After process
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func startTimer(*runtimeTimer)
Copy the code
The startTimer implementation is in runtime/time.go:
// startTimer adds t to the timer heap. // Add t to the timer heap. //go: linkName startTimer time *timer) { addtimer(t) }Copy the code
The process behind AddTimer has been seen before.
The timer. Tick process
func Tick(d Duration) <-chan Time {
if d <= 0 {
return nil
}
return NewTicker(d).C
}
Copy the code
// NewTicker returns a Ticker object whose channel receives a time value every period // If receiver is slow, The Ticker drops unwanted ticks // d must be greater than 0, Func NewTicker(d Duration) * ticker {if d <= 0 {panic(errors.New("non-positive interval for NewTicker")) } c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }Copy the code
It can be seen that the only difference between Ticker and r member of Timer is the period field. Ticker sends data to channel every other period, while Fire and Disappear is Timer.
Stop the process
func (t *Ticker) Stop() {
stopTimer(&t.r)
}
func (t *Timer) Stop() bool {
if t.r.f == nil {
panic("time: Stop called on uninitialized Timer")
}
return stopTimer(&t.r)
}
Copy the code
Both Timer and Ticker are stopTimer calls.
func stopTimer(t *timer) bool {
return deltimer(t)
}
Copy the code
Deltimer saw that up there as well.
The Reset process
func (t *Timer) Reset(d Duration) bool {
if t.r.f == nil {
panic("time: Reset called on uninitialized Timer")
}
w := when(d)
active := stopTimer(&t.r)
t.r.when = w
startTimer(&t.r)
return active
}
Copy the code
They’re all familiar functions, nothing special.
The last
This article is mainly about Go timer implementation, the industrial timer implementation is not only one kind. If you want to know how timers are implemented in other systems, such as Nginx, you can refer to this article.