Cache and subscription is a commonly used in the development of the back-end, launching method, which is mainly used to cache can lose data staging, release subscription is mainly used for messaging, released today to introduce a k8s with subscription cache implementation, the goal is given a time, a focus on the time of subsequent events, mainly used for near real-time state data acquisition

1. Business background

Kubelet in K8S supports different Container runtime. In order to Cache all visible pods/containers in the Container runtime, a Cache structure is constructed. When an event occurs, kubelet receives the event, and then needs to obtain the status of the current Pod. It has to be the most recent state after the event, not the previous state,

2. Core implementation

2.1 Data and Subscription records

2.1.1 Status Data

State data is mainly the state data that stores a POD

Type data struct {PodStatus *PodStatus; err error;Copy the code

2.1.2 Subscription Records

The subscription record is actually a subscription requirement that is notified of data through a CHAN, where the time field is the filter condition, i.e. only records with a time greater than time are allowed to be added to the CHAN

type subRecord struct {
    time time.Time
    ch   chan *data
}Copy the code

2.2 the Cache implementation

2.2.1 Core member structure

When kubelet PLEG updates the data in the cache, it will update timestamp, and it will retrieve the latest Pod status to fill the cache

// cache implements cache. type cache struct {// Read/write lock sync.RWMutex // stores Pod status data, which is used to obtain pods without time stamps Map [types.uid]*data // global timestamp, that is, data in the current cache, Subscribers map[types.uid][]*subRecord} TIMESTAMP *time. time subscribers map[types.uid][]*subRecord}Copy the code

2.2.3 Obtaining Common status Data

Normal state retrieval is the return of data directly from Map

func (c *cache) Get(id types.UID) (*PodStatus, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    d := c.get(id)
    return d.status, d.err
}

Copy the code

2.2.4 Default state constructor

When no corresponding data exists in the current CAHCE, a default status data is generated based on the ID

func (c *cache) get(id types.UID) *data { d, ok := c.pods[id] if ! Ok {return makeDefaultData(id)} return d} // Default state constructor func makeDefaultData(id types.uid) *data {return &data{status: &PodStatus{ID: id}, err: nil} }Copy the code

2.2.5 Obtaining the latest status data

Is given a timestamp, is valid only if the data currently cached is after that timestamp, otherwise returns nil, and the key point here is the design of timestamp, because in every PLEG cycle, timestamp is updated

If the minTime

Func (c *cache) getIfNewerThan(id types.uid, minTime time.time) *data {// Get the current status d, Ok := c.stamp [id] // If the global timestamp is larger than the given time, globalTimestampIsNewer := (c.stamp! = nil && c.timestamp.After(minTime)) if ! Ok && globalTimestampIsNewer {// The state is not cached, but the global time is newer than the minimum time, then return makeDefaultData(id)} Has been updated or global time if ok && (d.m odified. After (minTime) | | globalTimestampIsNewer) {return} d / / The pod status is not ready. return nil }Copy the code

2.2.6 Subscription status pipeline construction

The subscription pipe eventually returns a pipeline of states, and checks that if it finds data currently available, it is thrown directly into the pipe, otherwise a subRecords subscription record is created and saved

func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data { ch := make(chan *data, 1) c.lock.lock () defer C.lock.unlock () // obtain the status data d := c.getifnewerthan (id, timestamp) if D! = nil {// If there is already stateful data, Ch < -d return ch} // Otherwise add a subscription record to subscribers' list C.subscribers [id] = append(C.subscribers [id], &subRecord{time: timestamp, ch: ch}) return ch }Copy the code

2.2.7 Notice to clear expired pipes

The notification will be detected according to the subscription time of subRecord. If the subscription time has exceeded the current timestamp, the data will be directly retrieved and returned. Finally, only those subscription records that have not expired will be retained

Func (c *cache) notify(id types.uid, timestamp time.time) {// Get the id list of events, ok := c.subscribers[id] if! Ok {// No one to notify. return} newList := []*subRecord{} R := range list {// If the subscription is before timestamp, no operation is performed. Timestamp if timestamp.Before(r.time) {newList = append(newList, list[I]) continue} R.cn < -c.net (id) close(r.cn)} if len(newList) == 0 { Delete (c.subscribers, id)} else {// The remaining subscription list c.subscribers[id] = newList}}Copy the code

2.2.8 Global timestamp update

When the global timestamp is updated, all subscriptions are iterated and the latest global timestamp is used as the time for notification

Func (c *cache) UpdateTime(timestamp time.time) {c.lock.lock () defer c.lock.unlock () c.timstamp = ×tamp // Notify all the subscribers if the condition is met. for id := range c.subscribers { c.notify(id, *c.timestamp) } }Copy the code

2.2.9 Pod event update notification function

When updates are made, notify is called

func (c *cache) Set(id types.UID, status *PodStatus, err error, Timestamp time.time) {c.lock.lock () defer c.lock.unlock () // defer c.notify(id, C. pods[id] = &data{status: status, err: err, modified: timestamp}}Copy the code

That’s all for today. There’s a lot to learn about these data structures and designs. I hope you can communicate more and learn about cloud native design and key implementations together

Wechat id: Baxiaoshi2020

Watch the bulletin number to read more source code analysis articles

More articles can be found at www.sreguide.com

This post is posted by OpenWrite, a blogging platform