background

The main function

All data add, delete, modify, search and WATCH in K8s are carried out through Apiserver. In order to avoid the access pressure to Etcd, a Cacher struct is abstracted in K8s. All etCD events are distributed and accessed through this object, which wraps the ETCD client as a storage shared among multiple stores

Function and dismantling

  • Store: Provides data operation interfaces, such as add, delete, modify, search and Watch
  • Watcher: Listen for changes in store data and do some processing

Note that watcher and Store are a general abstraction for data operations, such as Store and Watcher for etCD, Store and Watcher for caches, and Store and Watch for rest

Model to understand

  • Store: provides a unified interface to mask the underlying layerData is storedThe operation of the
  • Watcher: The watcher is used to get event changes from the store for some custom action processing
  • Client: If client wants to develop new functions, it only needs to start a new Watcher to control the data changes it cares about and complete custom logic processing

    Maybe if we go any biggerData storage layerFramework (store)The business logicThe mulberry layer is decoupled

Detailed call diagram

Code implementation

Critical data structure

  • Cacher is responsible for interacting with the back-end EtCD Store and distributing events to all watcher

    type Cacher struct {



    // Length performance indicator of the current incoming queue

    incomingHWM HighWateMark

    // Incoming event channel, which was sent to all Watchers

    incoming chan watchCacheEvent



    sync.RWMutex



    // The current cache ready state must be OK to be accessed

    ready *ready



    // Back-end storage data interface

    storage Store



    // Object type

    objectType reflect.Type



    watchCache watchCache

    reflector *Reflector



    versioner Versioner



    triggerFunc TriggerPublisherFunc

    watcherIdx int

    watchers indexedWatchers



    dispatchTimeoutBudge *timeBudget



    stopLock sync.RWMutex

    stopped bool

    stopCh chan struct{}

    stopWg sync.WaitGroup

    }

    Copy the code
  • CacherWatcher receives Cacher sent events and sends them to the REST WebSocket interface

    // cacheWatcher implements the Watch interface

    type cacheWatcher struct {

    sync.Mutex

    input chan *watchCacheEvent

    result chan Event

    done chan struct{}

    filter filterWithAttrsFunc

    stopped bool

    forget func(bool)

    versioner Versioner

    }

    Copy the code
  • The Reflector Watch backend transforms the data and sends the event to watchCache(the watch is passed to Reflector as a store).

    / / Reflector reflection

    type Reflector struct {

    name string



    expectedType reflect.Type



    store Store



    listerWatcher ListerWatcher

    }

    Copy the code

Key method implementation

func NewCacherFromConfig(config Config) *Cacher {

// First generate a watchCache

watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)

listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)

reflectorName := "storage/cacher.go:" + config.ResourcePrefix



stopCh := make(chan struct{})

cacher := &Cacher{

ready: newReady(),

storage: config.Storage,

objectType: reflect.TypeOf(config.Type),

watchCache: watchCache,

// watchCache will be passed by the Party Store to the back end Reflector, which will convert the etCD data into an event after reflector gets the data

reflector: cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),

versioner: config.Versioner,



}

// watchCache sets the SetOnEvent method to cacher's processEvent. All watchCache events are handled by the watchCache method

watchCache.SetOnEvent(cacher.processEvent)

go cacher.dispatchEvents()



cacher.stopWg.Add(1)

go func() {

defer cacher.stopWg.Done()

wait.Until(

func() {

if ! cacher.isStopped() {

cacher.startCaching(stopCh)

}

}, time.Second, stopCh,

)

} ()

return cacher

}

Copy the code
  • ProcessEvent stores events in its incoming queue

    func (c *Cacher) processEvent(event *watchCacheEvent) {

    if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {

    fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen)

    }

    c.incoming <- *event

    }

    Copy the code
  • Events are passed to the front end Watcher

    func (c *Cacher) dispatchEvents() {

    for {

    select {

    case event, ok := <-c.incoming:

    if ! ok {

    return

    }

    c.dispatchEvent(&event)

    case <-c.stopCh:

    return

    }

    }

    }



    func (c *Cacher) dispatchEvent(event *watchCacheEvent) {

    triggerValues, supported := c.triggerValues(event)



    c.Lock()

    defer c.Unlock()



    for _, watcher := range c.watchers.allWatchers {

    watcher.add(event, d.dispatchTimeoutBudge)

    }

    if supported {

    for _, triggerValue := range triggerValues {

    for _, watcher := range c.watchers.valueWatchers[triggerValue] {

    watcher.add(event, d.dispatchTimeoutBudge)

    }

    }

    } else {

    for _, watchers := range c.watchers.valueWatchers {

    for _, watcher := range watchers {

    watcher.add(event, c.dispatchTimeoutBudge)

    }

    }

    }

    }

    Copy the code

The complete code

package cacher



import (

"context"

"fmt"

"reflect"

"sync"

"sync/atomic"

"time"



"k8s.io/apimachinery/pkg/api/meta"

"k8s.io/apimachinery/pkg/conversion"

)



/ / HighWateMark performance

type HighWateMark int64



// Update atomic Update

func (hwm *HighWateMark) Update(current int64) bool {

for {

old := atomic.LoadInt64((*int64)(hwm))

if current <= old {

return false

}

if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {

return true

}

}

}



// ready

type ready struct {

ok bool

c *sync.Cond

}



/ / newReady new

func newReady() *ready {

return &ready{c: sync.NewCond(&sync.Mutex{})}

}



// wait

func (r *ready) wait() {

r.c.L.Lock()

for ! r.ok {

r.c.Wait()

}

r.c.L.Unlock()

}



// check returns the current status

func (r *ready) check() bool {

r.c.L.Lock()

defer r.c.L.Unlock()

return r.ok

}



// set changes the state

func (r *ready) set(ok bool) {

r.c.L.Lock()

defer r.c.L.Unlock()

r.ok = ok

r.c.Broadcast()

}



// TypeMeta API request metadata

type TypeMeta struct {

Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`

APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`

}



// ListOption Specifies the parameters of the request

type ListOption struct {

TypeMeta `json: ",inline"`



LabelSelector string

FieldSelector string

// Whether to contain the initialized resource

IncludeUninitialized bool

// Use websocket to feedback resource Add, update, and remove event notifications

Watch bool

ResourceVersion string

TimeoutSecond *int64

Limit int64

}



// ListerWatcher abstract interface

type ListerWatcher interface {

List(option ListOption) (Object, error)

Watch(option ListOption) (Interface, error)

}



/ / Reflector reflection

type Reflector struct {

name string



expectedType reflect.Type



store Store



listerWatcher ListerWatcher

}



// ListAndWatch gets the latest version and watch data changes

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {

return nil

}



/ / MatchValue criticism

type MatchValue struct {

IndexName string

Value string

}



// TriggerPublisherFunc gets the matching data

type TriggerPublisherFunc func(obj Object) []MatchValue



//

type filterWithAttrsFunc func(key string, l Set, f Set, uninitializer bool) bool



// cacheWatcher implements the Watch interface

type cacheWatcher struct {

sync.Mutex

input chan *watchCacheEvent

result chan Event

done chan struct{}

filter filterWithAttrsFunc

stopped bool

forget func(bool)

versioner Versioner

}



func (c *cacheWatcher) Stop() {

c.forget(true)

c.stop()

}



func (c *cacheWatcher) stop() {

c.Lock()

defer c.Unlock()

if c.stopped {

c.stopped = true

close(c.done)

close(c.input)

}

}



func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {

watcher := &cacheWatcher{

input: make(chan *watchCacheEvent, chanSize),

result: make(chan Event, chanSize),

done: make(chan struct{}),

filter: filter,

stopped: false,

forget: forget

}

go watcher.process(initEvents, resourceVersion)

return watcher

}



type watchersMap map[int]*cacheWatcher



func (wm watchersMap) terminateAll() {

for key, watcher := range wm {

delete(wm, key)

watcher.Stop()

}

}



type indexedWatchers struct {

allWatchers watchersMap

valueWatchers map[string]watchersMap

}



func (i *indexedWatchers) terminateAll(objectType reflect.Type) {

if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {

fmt.Println("Terminating all watchers from cacher %v", objectType)

}

i.allWatchers.terminateAll()

for index, watchers := range i.valueWatchers {

watchers.terminateAll()

delete(i.valueWatchers, index)

}

}



func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {

if supported {

i.valueWatchers[value].deleteWatcher(number)

if len(i.valueWatchers[value]) == 0 {

delete(i.valueWatchers, value)

}

} else {

i.allWatchers.deleteWatcher(number)

}

}



func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {

if supported {

if _, ok := i.valueWatchers[value]; ! {

i.valueWatchers[value] = watchersMap{}

}

i.valueWatchers[value].addWatcher(w, number)

} else {

i.allWatchers.addWatcher(w, number)

}

}



type timeBudget struct {

sync.Mutex

budget time.Duration



refresh time.Duration

maxBudget time.Duration

}



type Labels interface {

Hash(label string) (exists bool)

Get(label string) (value string)

}



type Selector interface {

Matchs(Labels) bool

Empty() bool

String() string

RequiresExactMatch(field string) (value string, found bool)

DeepCopySelector() Selector

}



AttrFunc gets the Label and Field collection of the object

type AttrFunc func(obj Object) (Set, Set, bool, error)



// Representation of the SelectPredicate object

type SelectionPredicate struct {

Label Selector

Fielld Selector

IncludeUninitialized bool

GetAttrs AttrFunc

InedxFields []string

Limit int64

Continue string

}



func (s *SelectionPredicate) MatcherIndex() []MatchValue {

var result []MatchValue

for , field := range s.InedxFields {

if value, ok := s.Fielld.RequiresExactMatch(field); ok {

result = append(result, MatchValue{IndexName: field, Value: value})

}

}

return result

}



type Feature string



type FeatureGate interface {

Enabled(key Feature) bool

}



type UID string



type Preconditions struct {

UID *UID

}



type StatusError struct {

ErrStatus metav1.Status

}



type errWatcher struct {

result chan Event

}



func newErrWatcher(err error) *errWatcher {

errEvent := Event{Type: Error}

switch err := err.(type) {

case Object:

errEvent.Object = err

case StatusError:

errEvent.Object = &err.ErrStatus

default:

errEvent.Object = &metav1.Status{

Status: metav1.StatusFailure,

Message: err.Error(),

Reason: metav1.StatusReasonInternalError,

Code: http.StatusInternalServerError,

}

}

}



type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)





/ / Cacher cache

type Cacher struct {



// Length performance indicator of the current incoming queue

incomingHWM HighWateMark

// Incoming event channel, which was sent to all Watchers

incoming chan watchCacheEvent



sync.RWMutex



// The current cache ready state must be OK to be accessed

ready *ready



// Back-end storage data interface

storage Store



// Object type

objectType reflect.Type



watchCache watchCache

reflector *Reflector



versioner Versioner



triggerFunc TriggerPublisherFunc

watcherIdx int

watchers indexedWatchers



dispatchTimeoutBudge *timeBudget



stopLock sync.RWMutex

stopped bool

stopCh chan struct{}

stopWg sync.WaitGroup

}



func (c *Cacher) Versioner() Versioner {

return c.storage.Versioner()

}



func (c *Cacher) Create(ctx context.Context, ket string, out Object preconditions *Preconditions) {

c.storage.Create(ctx, key, out, preconditions)

}



func (c *Cacher) Delete(ctx context.Context, key string, out Object, preconditions *Preconditions) error {

c.storage.Delete(ctx, key, out, preconditions)

}



func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (Interface, error) {

watchRV,, err := c.versioner.ParseResourceVersion(resourceVersion)

if err ! = nil {

return nil, err

}



c.ready.wait()



c.watchCache.RLock()

defer c.watchCache.RUnlock()

initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)

if err ! = nil {

return newErrWatcher(err), nil

}



triggerValue, triggerSupported := "", false

if matchValues := pred.MatchIndex(); len(matchValues) > 0 {

triggerValue, triggerSupported = matchValues[0].Value, true

}



chanSize := 10

if c.triggerFunc ! = nil && ! triggerSupported {

chanSize = 100

}



c.Lock()

defer c.Unlock()

forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)

watcher := newCacheWatcher(watchRv, chanSize, initEvents, filterWithAttrsFunc(key, pred), forget, c.versioner)



c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)

c.watcherIdx++

return watcher, nil

}



func (c *Cacher) WatchList(ctx context.Context, key, string, resourceVersion string, pred SelectionPredicate) (Interface, error) {

return c.Watch(ctx, key, resourceVersion, pred)

}



func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr Object, ignoreNotFound bool) error {

if resourceVersion == "" {

return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)

}



getRv, err := c.versioner.ParseResourceVersion(resourceVersion)

if err ! = nil {

return nil, err

}



if getRv == 0 && ! c.ready.check() {

return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)

}



c.ready.wait()



objValue, err := conversion.EnforcePtr(objPtr)

if err ! = nil {

return nil, err

}



obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRv, key, nil)

if err ! = nil {

return nil, err

}



if exists {

elem, ok := obj.(*storeElement)

if ! ok {

return fmt.Errorf("non *storeElement returned form storage : %v", obj)

}

objValue.Set(reflect.ValueOf(elem.Object).Elem())

} else {

objValue.Set(reflect.Zero(objValue.Type()))

if ! ignoreNotFound {

return fmt.Errorf("key: %v resourversion: %v", objValue, getRv)

}

}

return nil

}



func (c *Cacher) List(ctx context.Context, ket string, resourceVersion string, pred SelectionPredicate, listObj Object) error {

if resourceVersion == "" {

// In fact, there is information about the current win rate

return c.storage.list(ctx, key, resourceVersion, pred, listObj)

}



listRV, err := c.versioner.ParseResourceVersion(resourceVersion)

if err ! = nil {

return err

}



if listRV ==0 && ! c.ready.check() {

return c.storage.List(ctx, key, resourceVersion, pred, listObj)

}



c.ready.wait()



listPtr, err := conversion.EnforcePtr(listPtr)

if err ! = nil {

return err

}

listVal, err := conversion.EnforcePtr(listPtr)

if err ! = nil || listVal.Kind() ! = reflect.Slice {

return fmt.Errorf("need a pointer to slice got %v", listVal.Kind())

}



filter := filterWithAttrsFunc(key, pred)



objs, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV)

if err ! = nil {

return err

}



if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Fielld.Empty() {

// If an object is found to exceed the size of the slice, a new one is generated

listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))

}

for _, obj := range objs {

elem, ok := obj.(*storeElement)

if ! ok {

return fmt.Errorf("non *storeElement returned from storage: %v", obj)

}

if filter(elem.Key, elem.Fields, elem.Labels, elem.Uninitialized) {

// Reflection needs to be learned later

listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))

}

}

if c.versioner ! = nil {

if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err ! = nil {

return err

}

}

return nil

}



func (c *Cacher) GuaranteedUpdate(

ctx context.Context, key string, ptrToType Object, ignoreNotFound bool,

preconditions * Preconditions, tryUpdate UpdateFunc, _... Object) error {

if elem, exists, err := c.watchCache.GetByKey(key); err ! = nil {

fmt.Printf("GetByKey returned error: %v", err)

} else if exists {

currObj := elem.(*storeElement).Object.DeepCopyObject()

return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)

}

return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)

}



func (c *Cacher) Count(pathPrefix string) (int64, error) {

return c.storage.Count(pathPrefix)

}



func (c *Cacher) triggerValues(event *watchCacheEvent))([]string, bool) {

if c.triggerFunc == nil {

return nil, false

}



result := make([]string, 2)

matchValues := c.triggerFunc(event.Object)

if len(matchValues) > 0 {

result = append(result, matchValues[0].Value)

}

if event.PrevObject == nil {

return result, len(result) > 0

}

prevMatchValues := c.triggerFunc(event.PrevObject)

if len(prevMatchValues) > 0 {

if len(result) == 0 || result[0] ! = prevMatchValues[0].Value {

result = append(result, prevMatchValues[0].Value)

}

}

return result, len(result) > 0

}



func (c *Cacher) processEvent(event *watchCacheEvent) {

if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {

fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen)

}

c.incoming <- *event

}



func (c *Cacher) dispatchEvents() {

for {

select {

case event, ok := <-c.incoming:

if ! ok {

return

}

c.dispatchEvent(&event)

case <-c.stopCh:

return

}

}

}



func (c *Cacher) dispatchEvent(event *watchCacheEvent) {

triggerValues, supported := c.triggerValues(event)



c.Lock()

defer c.Unlock()



for _, watcher := range c.watchers.allWatchers {

watcher.add(event, d.dispatchTimeoutBudge)

}

if supported {

for _, triggerValue := range triggerValues {

for _, watcher := range c.watchers.valueWatchers[triggerValue] {

watcher.add(event, d.dispatchTimeoutBudge)

}

}

} else {

for _, watchers := range c.watchers.valueWatchers {

for _, watcher := range watchers {

watcher.add(event, c.dispatchTimeoutBudge)

}

}

}

}





func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj Object) error {



if resourceVersion == "" || (len(pred.Continue) > 0 || pred.Limit > 0) {

// If resourceVersion is empty, obtain the corresponding data directly from the storage

return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)

}



// Call versioner to parse the list version

listRv, err := c.versioner.ParseResourceVersion(resourceVersion)

if err ! = nil {

return err

}



// listRv is 0 and the cache has not been updated

if listRv == 0 && ! c.ready.check() {

return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)

}



c.ready.wait()



listPtr, err := meta.GetItemsPtr(listObj)

if err ! = nil {

return err

}

listVal, err := conversion.EnforcePtr(listObj)

if err ! = nil || listVal.Kind() ! = reflect.Slice {

return fmt.Errorf("need a prointer to slice got %v", listVal.Kind())

}

filter := filterWithAttrsFunc(key, pred)



// Get the corresponding objs from the back end based on the resource version converted above

obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRv, key)

if err ! = nil {

return err

}



if exits {

elem, ok := obj.(*storeElement)

if ! ok {

return fmt.Errorf("non *storeElement returned from storage: %v", obj)

}

if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {

listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())

}

if c.versioner ! = nil {

if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err ! = nil {

return err

}

}

}

return nil

}





func (c *Cacher) startCaching(stopChannel <-chan struct{}) {

successfulList := false

c.watchCache.SetOnReplace(func() {

successfulList = true

c.ready.set(true)

})

defer func() {

if successfulList {

c.ready.set(false)

}

} ()

c.terminateAllWatchers()

if err := c.reflector.ListAndWatch(stopChannel); err ! = nil {

fmt.Errorf("unexpected listAndWatch error: %v", err)

}

}



func (c *Cacher) terminateAllWatchers() {

c.Lock()

defer c.Unlock()

c.watchers.terminateAll(c.objectType)

}



func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {

return func(lock bool) {

if lock {

c.Lock()

defer c.Unlock()

} else {

fmt.Errorf("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())

}

c.watchers.deleteWatcher(index, triggerValue, triggerSupported)

}

}

Copy the code

feeling

In fact, the core of Storage is the upper layer implementation of data change notification, Watcher is used to pay attention to data change transfer response. Next, I might pause and look at the apiserver side, and look at the Controller and the client-go volume part, but I’ll do that, Good Night

#k8s
K8s watchCache cache sliding window