Cache and subscription is a commonly used in the development of the back-end, launching method, which is mainly used to cache can lose data staging, release subscription is mainly used for messaging, released today to introduce a k8s with subscription cache implementation, the goal is given a time, a focus on the time of subsequent events, mainly used for near real-time state data acquisition
1. Business background
Kubelet in K8S supports different Container runtime. In order to Cache all visible pods/containers in the Container runtime, a Cache structure is constructed. When an event occurs, kubelet receives the event, and then needs to obtain the status of the current Pod. It has to be the most recent state after the event, not the previous state,
2. Core implementation
2.1 Data and Subscription records
2.1.1 Status Data
State data is mainly the state data that stores a POD
Type data struct {PodStatus *PodStatus; err error;Copy the code
2.1.2 Subscription Records
The subscription record is actually a subscription requirement that is notified of data through a CHAN, where the time field is the filter condition, i.e. only records with a time greater than time are allowed to be added to the CHAN
type subRecord struct {
time time.Time
ch chan *data
}Copy the code
2.2 the Cache implementation
2.2.1 Core member structure
When kubelet PLEG updates the data in the cache, it will update timestamp, and it will retrieve the latest Pod status to fill the cache
// cache implements cache. type cache struct {// Read/write lock sync.RWMutex // stores Pod status data, which is used to obtain pods without time stamps Map [types.uid]*data // global timestamp, that is, data in the current cache, Subscribers map[types.uid][]*subRecord} TIMESTAMP *time. time subscribers map[types.uid][]*subRecord}Copy the code
2.2.3 Obtaining Common status Data
Normal state retrieval is the return of data directly from Map
func (c *cache) Get(id types.UID) (*PodStatus, error) {
c.lock.RLock()
defer c.lock.RUnlock()
d := c.get(id)
return d.status, d.err
}
Copy the code
2.2.4 Default state constructor
When no corresponding data exists in the current CAHCE, a default status data is generated based on the ID
func (c *cache) get(id types.UID) *data { d, ok := c.pods[id] if ! Ok {return makeDefaultData(id)} return d} // Default state constructor func makeDefaultData(id types.uid) *data {return &data{status: &PodStatus{ID: id}, err: nil} }Copy the code
2.2.5 Obtaining the latest status data
Is given a timestamp, is valid only if the data currently cached is after that timestamp, otherwise returns nil, and the key point here is the design of timestamp, because in every PLEG cycle, timestamp is updated
If the minTime
Func (c *cache) getIfNewerThan(id types.uid, minTime time.time) *data {// Get the current status d, Ok := c.stamp [id] // If the global timestamp is larger than the given time, globalTimestampIsNewer := (c.stamp! = nil && c.timestamp.After(minTime)) if ! Ok && globalTimestampIsNewer {// The state is not cached, but the global time is newer than the minimum time, then return makeDefaultData(id)} Has been updated or global time if ok && (d.m odified. After (minTime) | | globalTimestampIsNewer) {return} d / / The pod status is not ready. return nil }Copy the code
2.2.6 Subscription status pipeline construction
The subscription pipe eventually returns a pipeline of states, and checks that if it finds data currently available, it is thrown directly into the pipe, otherwise a subRecords subscription record is created and saved
func (c *cache) subscribe(id types.UID, timestamp time.Time) chan *data { ch := make(chan *data, 1) c.lock.lock () defer C.lock.unlock () // obtain the status data d := c.getifnewerthan (id, timestamp) if D! = nil {// If there is already stateful data, Ch < -d return ch} // Otherwise add a subscription record to subscribers' list C.subscribers [id] = append(C.subscribers [id], &subRecord{time: timestamp, ch: ch}) return ch }Copy the code
2.2.7 Notice to clear expired pipes
The notification will be detected according to the subscription time of subRecord. If the subscription time has exceeded the current timestamp, the data will be directly retrieved and returned. Finally, only those subscription records that have not expired will be retained
Func (c *cache) notify(id types.uid, timestamp time.time) {// Get the id list of events, ok := c.subscribers[id] if! Ok {// No one to notify. return} newList := []*subRecord{} R := range list {// If the subscription is before timestamp, no operation is performed. Timestamp if timestamp.Before(r.time) {newList = append(newList, list[I]) continue} R.cn < -c.net (id) close(r.cn)} if len(newList) == 0 { Delete (c.subscribers, id)} else {// The remaining subscription list c.subscribers[id] = newList}}Copy the code
2.2.8 Global timestamp update
When the global timestamp is updated, all subscriptions are iterated and the latest global timestamp is used as the time for notification
Func (c *cache) UpdateTime(timestamp time.time) {c.lock.lock () defer c.lock.unlock () c.timstamp = ×tamp // Notify all the subscribers if the condition is met. for id := range c.subscribers { c.notify(id, *c.timestamp) } }Copy the code
2.2.9 Pod event update notification function
When updates are made, notify is called
func (c *cache) Set(id types.UID, status *PodStatus, err error, Timestamp time.time) {c.lock.lock () defer c.lock.unlock () // defer c.notify(id, C. pods[id] = &data{status: status, err: err, modified: timestamp}}Copy the code
That’s all for today. There’s a lot to learn about these data structures and designs. I hope you can communicate more and learn about cloud native design and key implementations together
Wechat id: Baxiaoshi2020
Watch the bulletin number to read more source code analysis articles
More articles can be found at www.sreguide.com
This post is posted by OpenWrite, a blogging platform