Recently joined the cloud native community organization K8S source code institute, began to learn k8S source code, and sorted out notes. Welcome interested students to join us and make progress together. There are all kinds of bigwigs in the group and community ready to answer your questions. Github.com/cloudnative…

The importance of Informer in K8S is no longer to be discussed, but to get straight to the point.

Let’s start with an invocation diagram

Hd address

As the source code of Informer is relatively complex and the call link is very long, the subsequent source code analysis will be based on this figure.

An overview of the

In K8S, components communicate with each other through HTTP. In the case of not relying on any middleware, the reliability, real-time and sequential of messages need to be guaranteed. How does the K8S do it? — Informer. The other components of K8S communicate with the API-Server through the Informer.

Working principles of Informer

The components include:

  • Reflector: Used to monitor the specified resource. When the monitored resource changes, a change event is triggered. The resource object is stored in the local cache DeltaFIFO
  • DeltaFIFO: Performs basic operations on queues for the operation types of resource objects
    • FIFO: FIFO queue provides operations such as adding, deleting, modifying, and querying resource objects
    • Dealta: Resource object store, which stores operation types of resource objects. For example, add operation type, update operation type, delete operation type, synchronize operation type
  • Indexer: local storage that stores resource objects and has the index function.
    • Reflect stores consumed resource objects from DeltaFIFO to Indexer
    • The data in Indexer is exactly the same as Etcd, and client-Go can read it locally, taking the pressure off Etcd and API-Server

Example of Informer

  • Create a Clientset object with kubernetes.newforconfig. The Informer needs to interact with apiserver through clientset
  • Create a stop channel to tell the Informer to exit before the process exits. Because Informer is a persistent Groutine
  • Informers. SharedInformer NewSharedInformerFactory instantiated object
    • The first parameter is ClientSet
    • The second parameter is how often to synchronize
  • The Informer method gets the Informer object for a particular resource
  • The AddEventHandler function can add callback methods to objects, supporting three types of object callback methods
    • AddFunc: Callback method triggered when the resource object is created
    • UpdateFunc: Callback method triggered when updating a resource object
    • DeleteFunc: Callback method triggered when deleting a resource object
  • The Run method runs the current Informer
// Use the informer mechanism to monitor K8S resources
func informer(a) {
  // Since informer is a persistent groutine, channel is used to tell informer to quit before the process exits
  stopChan := make(chan struct{})
  defer close(stopChan)

  // Create a client object to connect to K8s
  clientSet, err := kubernetes.NewForConfig(config)
  iferr ! =nil {
    log.Printf("init clientset error.")
    return
  }

  // Step 1: Create the sharedInformer object. The second parameter is the interval for resynchronizing data
  sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
  // Step 2: Each resource has an informer object. Here we get the pod resource's informer object
  podInformer := sharedInformers.Core().V1().Pods().Informer()
  // Step 3: Add custom callback functions
  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    // Add a resource callback function that returns the interface type and needs to be cast to the real type
    AddFunc: func(obj interface{}) {
      mObj := obj.(v1.Object)
      log.Printf("New pod added: %s", mObj.GetName())
    },
    // Update the resource callback function
    UpdateFunc: func(oldObj, newObj interface{}) {
      oObj := oldObj.(v1.Object)
      nObj := newObj.(v1.Object)
      log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
    },
    // Delete the resource callback function
    DeleteFunc: func(obj interface{}) {
      mObj := obj.(v1.Object)
      log.Printf("pod deleted from store: %s", mObj.GetName())
    },
  })
  // Step 4: Start running the Informer object
  podInformer.Run(stopChan)
}
Copy the code

Resources Informer and SharedInformer

In the previous demo, the first step was to create a SharedInformer object. Now let’s look at Informer and SharedInformer

Resources Informer

  • Each of these resources implements the Informer mechanism, allowing different resource events to be monitored
  • Every Informer implements the Informer and Lister methods
type PodInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.PodLister
}
Copy the code

SharedInformer

If the Informer of the same resource is instantiated multiple times, and each Informer uses a Reflector, then too many of the same ListAndWatch will run, and too many repeated serialization and deserialization operations will cause the apI-server to be overloaded

SharedInformer enables Informer of the same class to share a Reflector. Internally, a map field is defined to hold all Infromer fields.

Demo in front of the first step in creating SharedInformer sharedInformers: = informers. NewSharedInformerFactory (clientSet, time. The Minute), We’re internally initializing a sharedInformerFactory object, so let’s look at sharedInformerFactory

Vendor /k8s. IO /client-go/informer/factory.go

type sharedInformerFactory struct {
  client           kubernetes.Interface
  namespace        string
  tweakListOptions internalinterfaces.TweakListOptionsFunc
  lock             sync.Mutex
  defaultResync    time.Duration
  customResync     map[reflect.Type]time.Duration

  // Store shared informer by type
  informers map[reflect.Type]cache.SharedIndexInformer

  // This field is used to track if Informers is started
  // The Start() method can be safely called multiple times (idempotent)
  startedInformers map[reflect.Type]bool
}
Copy the code

The Start method

The controller-Manager component in K8S, the Run method in the source code calls the SharedInformerFactory Start method

Source location: CMD/kube – controller – manager/app/controllermanager. Go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error{... controllerContext.InformerFactory.Start(controllerContext.Stop) ... }Copy the code

IO /client-go/informers/factory.go

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}){...// Iterate through all informers
  for informerType, informer := range f.informers {
    if! f.startedInformers[informerType] {// Each informer starts a coroutine and runs the Run method
      go informer.Run(stopCh)
      f.startedInformers[informerType] = true}}}Copy the code

Get Informer

In the previous demo, the second step is to call podInformer := sharedInformer.core ().v1 ().Pods().informer (), and get the reformer instance. Let’s start with the Informer method

The key logic includes:

  • Initialization of sharedProcessor
  • Register the List and Watch methods: Register the List and Watch methods for a specific resource type
  • Initialization of Indexer: Implementation class is cache class

Pod, for example, source location: client – go/informers/core/v1 / pod. Go

// Get the pod informer, internally call InformerFor, and pass the argument to f.de FaultinFormer
func (f *podInformer) Informer(a) cache.SharedIndexInformer {
  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
  // Initializes indexers with the last argument
  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
  return cache.NewSharedIndexInformer(
    // Register the List, Watch methods
    &cache.ListWatch{
      ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
        iftweakListOptions ! =nil {
          tweakListOptions(&options)
        }
        // The List method is the List method of the resource object (in this case pod).
        return client.CoreV1().Pods(namespace).List(context.TODO(), options)
      },
      WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
        iftweakListOptions ! =nil {
          tweakListOptions(&options)
        }
        // The Watch method is the Watch method of the resource object.
        return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
      },
    },
    &corev1.Pod{},
    resyncPeriod,
    indexers,
  )
}

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  realClock := &clock.RealClock{}
  sharedIndexInformer := &sharedIndexInformer{
    // This is the processor initialization
    processor:                       &sharedProcessor{clock: realClock},
    // The interface is Indexer and the implementation class is cache
    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    listerWatcher:                   lw,
    objectType:                      exampleObject,
    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
    clock:                           realClock,
  }
  return sharedIndexInformer
}

// Index interface, implementation class is the cache class
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
  return &cache{
    cacheStorage: NewThreadSafeStore(indexers, Indices{}),
    keyFunc:      keyFunc,
  }
}
Copy the code

Register custom callback functions

After obtaining the Informer object, the third step is to register the infomer custom callback function to implement its own business logic when k8S resources send changes. The following analyze podInformer. AddEventHandler (cache. ResourceEventHandlerFuncs {… The logic of})

// Start registering event handlers
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration){...// Each listener is registered as a Listner instance
  // Each listener holds a handler object. The framework calls the handler method when a subsequent event occurs, which leads to the code logic for the user registration
  listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
  ...
  // Add listner to sharedProcessor
  s.processor.addListener(listener)
  for _, item := range s.indexer.List() {
    listener.add(addNotification{newObj: item})
  }
}

// Add listner to sharedProcessor
func (p *sharedProcessor) addListener(listener *processorListener){...if p.listenersStarted {
    // The listener starts two key coroutines in the background, which will be described later
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)
  }
}
Copy the code

Run method

In the previous demo, after the informer instance is obtained, the last step is to call the Run method. Now we start parsing the logic of the Run method. The core logic includes:

  • Initialization of DeltaFIFO
  • Initialization of Controller
  • Run the process.run method
  • Run the controller.run method

IO /client-go/tools/cache/shared_informer

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  // Initialize DeltaFIFO
  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,})// config initialization
  // Focus here on the ListerWatcher object and the Process object, which is associated with the HandleDeltas function
  // HandleDeltas is the core method for consuming incremental information (Delta objects)
  cfg := &Config{
    Queue:            fifo,
    / / ListAndWatch object
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,
    // Register the callback function HandleDeltas to store resource changes to the local Indexer
    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,
  }
  // This is mainly the initialization of the controller
  func(a) {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    // Initialize the Controller object
    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true} ()/ / s.c acheMutationDetector. Run to check the cache object exists
  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  // Execute the sharedProcessor.run method
  // This method is very important
  wg.StartWithChannel(processorStopCh, s.processor.run)
  ...
  // Call the Controller's Run method
  s.controller.Run(stopCh)
}
Copy the code

The Run method of process

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  func(a){...// All listners of sharedProcessor start two coroutines per background
    // Point to the run and pop methods respectively
    for _, listener := range p.listeners {
      p.wg.Start(listener.run)
      p.wg.Start(listener.pop)
    }
    p.listenersStarted = true
  }()
  ...
}
Copy the code

The run method of processorListener

The processorListener represents a consumer object that executes periodically, fetching incremental information from apI-Server from its Own nextCh channel, and then calling the handler method that was passed in by the user custom earlier.

All we need to know here is that the Run method is the consumer method and is responsible for consuming events. We’ll see who is the producer and who is putting incremental information into processorLister’s nextCh channel (actually the pop method below)

func (p *processorListener) run(a) {

  stopCh := make(chan struct{})
  wait.Until(func(a) {
    // The consumer method, which constantly gets events from the channel
    for next := range p.nextCh {
      switch notification := next.(type) {
      case updateNotification:
        // Call handler's method,
        p.handler.OnUpdate(notification.oldObj, notification.newObj)
      case addNotification:
        p.handler.OnAdd(notification.newObj)
      case deleteNotification:
        p.handler.OnDelete(notification.oldObj)
      default:
        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
      }
    }
    // the only way to get here is if the p.nextCh is empty and closed
    close(stopCh)
  }, 1*time.Second, stopCh)
}

// the OnUpdate method internally calls the UpdateFunc method registered in demo
// Other methods are similar
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
  ifr.UpdateFunc ! =nil {
    r.UpdateFunc(oldObj, newObj)
  }
}
Copy the code

Pop method of processorListener

The implementation of this method is quite complex, but the overall purpose is simple: to produce messages into nextCh that can then be consumed by the run method described earlier

Instead of fetching one event at a time, we use a buffer-like approach

func (p *processorListener) pop(a){...var nextCh chan<- interface{}
  var notification interface{}
  for {
    select {
    // The first time this function comes in, the notification must be empty and the case will be blocked
    // After the second case call is completed, the Notification is assigned to notificationToAdd
    case nextCh <- notification:
      // Notification dispatched
      var ok bool
      notification, ok = p.pendingNotifications.ReadOne()
      if! ok {// Nothing to pop
        nextCh = nil // Disable this select case
      }
    // The first time you call it, it goes in here and gets the data from addCh first (we'll see who puts it in addCh later)
    case notificationToAdd, ok := <-p.addCh:
      if! ok {return
      }
      if notification == nil { // No notification to pop (and pendingNotifications is empty)
        notification = notificationToAdd
        // channel is a reference type that points p.nextch to nextCh, and the operation on nextCh is p.nextch
        // This also answers the question of who is the producer of nextCh and puts data into nextCh
        nextCh = p.nextCh
      } else { // There is already a notification waiting to be dispatched
        p.pendingNotifications.WriteOne(notificationToAdd)
      }
    }
  }
}
Copy the code

In the pop function, we see that the main consumption of P.ddch is the producer of p.ddch. The watch function detects an apI-server event and triggers the HandlerDelta function, which updates Indexer and calls the distribute method to notify all listeners of the event. Add data to the addCh channel of each listener.

Controller’s Run method

The key logic inside the Run method includes:

  • Initialize the Reflector object
  • Call Reflector’s Run method
    • Call List to retrieve all resource data
    • Call Watch to monitor resource changes in real time and queue them
  • Call the controller’s processLoop method
    • Consume the data in the queue

Source location: k8s. IO/client – go/tools/cache/controller. Go

func (c *controller) Run(stopCh <-chan struct{}) {
  // Call NewReflector to initialize a Reflector
  // The ListWatcher data interface object must be passed in
  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  ...
  // Call Reflector's Run method to start monitoring and process monitoring events
  wg.StartWithChannel(stopCh, r.Run)

  // processLoop is responsible for fetching data from DeltaFIFO and consuming it
  wait.Until(c.processLoop, time.Second, stopCh)
}
Copy the code

Reflector

Pause the source code analysis and take a quick look at Reflector, which is used to monitor specified K8S resources and trigger corresponding change events.

  • NewReflector: To create a Reflector object, pass in the ListerWatcher data interface object
  • Run: starts monitoring and processes events

The core function in Run is ListAndWatch, and the flow includes:

  • Get resource list data
  • Monitor resource objects: block transfer encoding using the HTTP protocol

Reflector class

type Reflector struct {
  / / name
  name string
  expectedTypeName string
  // The type of resource expected to be placed in the cache store
  expectedType reflect.Type
  // The GVK of the object we expect to place in the store if unstructured.
  expectedGVK *schema.GroupVersionKind
  DeltaFIFO class = DeltaFIFO class
  store Store
  // The object used to execute List and Watch
  listerWatcher ListerWatcher
  backoffManager wait.BackoffManager
  / / resync cycle
  resyncPeriod time.Duration
  ShouldResync func(a) bool
  clock clock.Clock
  paginatedResult bool
  // The last version of the resource seen
  lastSyncResourceVersion string
  isLastSyncResourceVersionUnavailable bool
  lastSyncResourceVersionMutex sync.RWMutex
  WatchListPageSize int64
  watchErrorHandler WatchErrorHandler
}
Copy the code

Core method: Run

Core logic includes:

  • Call the List method to get all the data under the resource object
  • Transforms the resource data into a list of resource objects
  • The resource information is stored in DeltaFIFO, replacing the local cache in full
  • Call the Watch method to listen for resources
  • Call the watchHandler function to handle the various events that watch receives
/ / Run function
func (r *Reflector) Run(stopCh <-chan struct{}){... wait.BackoffUntil(func(a) {
    // Core function: ListAndWatch
    iferr := r.ListAndWatch(stopCh); err ! =nil {
      r.watchErrorHandler(r, err)
    }
  }, r.backoffManager, true, stopCh)
  ...
}
Copy the code

ListAndWatch method

/ / ListAndWatch function
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error{...if err := func(a) error{...go func(a){... pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
        // Call the List method to get all the data under the resource object
        return r.listerWatcher.List(opts)
      }))
      ...
    }()
    ...
    // Get the version number of the resource
    resourceVersion = listMetaInterface.GetResourceVersion()
    initTrace.Step("Resource version extracted")
    // Convert the resource data to a list of resource objects
    items, err := meta.ExtractList(list)
    // Store the resource information in DeltaFIFO, replacing the local cache in full
    // Call the replace method internally
    iferr := r.syncWith(items, resourceVersion); err ! =nil {
      return fmt.Errorf("unable to sync list result: %v", err)
    }
    // Set the latest version of the resource
    r.setLastSyncResourceVersion(resourceVersion)
    return nil} (); err ! =nil {
    return err
  }

  go func(a){...for{...// Synchronize resources
      if r.ShouldResync == nil || r.ShouldResync() {
        // Call DeltaFIFO's Resync method
        iferr := r.store.Resync(); err ! =nil{... } } resyncCh, cleanup = r.resyncChan() } }()for{...// Listen for resources
    w, err := r.listerWatcher.Watch(options)
    // Handle handler events, Add, Delete, Update functions registered by users
    iferr := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); . }}/ / syncWith function
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  found := make([]interface{}, 0.len(items))
  for _, item := range items {
    found = append(found, item)
  }
  / / call the cache. The Replace
  return r.store.Replace(found, resourceVersion)
}

// cache.Replace
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
  items := make(map[string]interface{}, len(list))
  for _, item := range list {
    key, err := c.keyFunc(item)
    iferr ! =nil {
      return KeyError{item, err}
    }
    items[key] = item
  }
  c.cacheStorage.Replace(items, resourceVersion)
  return nil
}
Copy the code

WatchHandler method

The watchHandler function handles all events received by the Watch. All events are stored in ResultChan, including event types and resource objects

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error{...for {
    select{...// Get the channel of the event in the watch interface
    case event, ok := <-w.ResultChan():
      ...
      switch event.Type {
      // handle the Add function
      case watch.Added:
        // Store is DeltaFIFO class
        err := r.store.Add(event.Object)
      // Process the Modified function
      case watch.Modified:
        err := r.store.Update(event.Object)
      // Process the Deleted function
      case watch.Deleted:
        err := r.store.Delete(event.Object)
      }
      *resourceVersion = newResourceVersion
      // Set the resource version
      r.setLastSyncResourceVersion(newResourceVersion)
      eventCount++
    }
  }
  ...
}
Copy the code

DeltaFIFO

Behind the source code analysis will use DeltaFIFO, here first introduced.

DeltaFIFO is used to Store various events returned by the Watch API. The same resource object with different operation types will exist in the Queue. DeltaFIFO implements the Queue interface, which inherits the Store interface

Source path: vendor/k8s. IO /client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
  // lock/cond protects access to 'items' and 'queue'.
  lock sync.RWMutex
  cond sync.Cond

  // We depend on the property that items in the set are in
  // the queue and vice versa, and that all Deltas in this
  // map have at least one Delta.

  // Map structure storage: key is the key of the resource object, value is the Deltas array of the object
  items map[string]Deltas

  // The key of the storage resource object
  queue []string

  // populated is true if the first batch of items inserted by Replace() has been populated
  // or Delete/Add/Update was called first.
  populated bool
  // initialPopulationCount is the number of items inserted by the first call of Replace()
  initialPopulationCount int

  // keyFunc is used to make the key used for queued item
  // insertion and retrieval, and should be deterministic.
  keyFunc KeyFunc

  // Index Stores the object locally
  knownObjects KeyListerGetter

  closed     bool
  closedLock sync.Mutex
}
Copy the code

Core functions include:

  • Producer method
  • Consumer approach
  • Resync mechanism

Producer method

After Reflector listens to resource changes, it adds resource changes such as Add, Delete, and Update to the DeltaFIFO. That’s the producer of the queue, and it’s called internally

The entry function is r.tore.add (Event.object), described earlier in watchHandler

func (f *DeltaFIFO) Add(obj interface{}) error{...return f.queueActionLocked(Added, obj)
}

func (f *DeltaFIFO) Update(obj interface{}) error{...return f.queueActionLocked(Updated, obj)
}

func (f *DeltaFIFO) Delete(obj interface{}) error{...return f.queueActionLocked(Deleted, obj)
}

// For example, one handler, the others are similar, and all call queueActionLocked internally
func (f *DeltaFIFO) Update(obj interface{}) error {
  f.lock.Lock()
  defer f.lock.Unlock()
  f.populated = true
  // Call queueActionLocked internally
  return f.queueActionLocked(Updated, obj)
}
Copy the code

QueueActionLocked method

// Encapsulates Delta events and queues them for consumption (HandleDeltas)
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  // According to the resource object, get the key, usually in namespace/name format
  id, err := f.KeyOf(obj)

  // Encapsulate the watch event type and resource object into a Delta object
  newDeltas := append(f.items[id], Delta{actionType, obj})
  // delete operation
  newDeltas = dedupDeltas(newDeltas)

  // Add the Delta object to the queue
  if len(newDeltas) > 0 {
    if_, exists := f.items[id]; ! exists { f.queue =append(f.queue, id)
    }
    f.items[id] = newDeltas
    f.cond.Broadcast()
  } else {
    delete(f.items, id)
  }
  return nil
}
Copy the code

Consumer method -processLoop

After analyzing Reflector’s Run method, the next step is the Controller’s processLoop method

func (c *controller) Run(stopCh <-chan struct{}) {
  // Call NewReflector to initialize a Reflector
  // The ListWatcher data interface object must be passed in
  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  ...
  // Call Reflector's Run method to start monitoring and process monitoring events
  wg.StartWithChannel(stopCh, r.Run)

  // processLoop is responsible for fetching data from DeltaFIFO and consuming it
  wait.Until(c.processLoop, time.Second, stopCh)
}
Copy the code
func (c *controller) processLoop(a) {
  for {
    // Fetch the data from the DeltaFIFO queue and hand it to Process
    // The process function is stored in config. process, which is the HandleDeltas passed in earlierobj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) ... }}Copy the code

The Pop method

The DeltaFIFO consumption method is Pop, which requires passing in the process function to receive and process the object’s callback method

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  for {
    for len(f.queue) == 0 {
      // When the queue is empty, the Pop function blocks until new data is queued
      // If Close is called, the closed state is set and broadcast
      if f.closed {
        return nil, ErrFIFOClosed
      }

      f.cond.Wait()
    }
    // There is data in the queue
    id := f.queue[0]...// Pass the data to the upper callback function for processing
    err := process(item)
    // If an error occurs, the data is put back into the queue
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)
      err = e.Err
    }
    return item, err
  }
}
Copy the code

Process function: HandleDeltas

  • The Run method passes in a callback called HandleDeltas
  • The POP object executed inside the processLoop is the HandleDeltas passed in above

Core logic includes:

  • Updating the local cache cacheStorage is essentially updating the threadSafeMap data structure
  • Notifying all listeners of events is simply adding data to the Listener addCh for consumption by the consumer
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  // Get all Delta resources
  for _, d := range obj.(Deltas) {
    // Determine the resource type
    switch d.Type {
    // If the following type is used, store the resource in Indexer
    case Sync, Replaced, Added, Updated:
      s.cacheMutationDetector.AddObject(d.Object)
      if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
        // The implementation class of Indexer is the cache described earlier
        iferr := s.indexer.Update(d.Object); err ! =nil {
          return err
        }

        isSync := false
        switch {
        case d.Type == Sync:
          isSync = true
        case d.Type == Replaced:
          if accessor, err := meta.Accessor(d.Object); err == nil {
            if oldAccessor, err := meta.Accessor(old); err == nil {
              isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
            }
          }
        }
        // Distribute the resource object to SharedInformer's event handler
        s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
      } else {
        iferr := s.indexer.Add(d.Object); err ! =nil {
          return err
        }
        s.processor.distribute(addNotification{newObj: d.Object}, false)}case Deleted:
      iferr := s.indexer.Delete(d.Object); err ! =nil {
        return err
      }
      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}

// Update the local cache cacheStorage
// Update the threadSafeMap data structure. The initialization of threadSafeMap is described earlier
func (c *cache) Update(obj interface{}) error {
  key, err := c.keyFunc(obj)
  iferr ! =nil {
    return KeyError{obj, err}
  }
  c.cacheStorage.Update(key, obj)
  return nil
}

/ / distribute function
func (p *sharedProcessor) distribute(obj interface{}, sync bool){...if sync {
    for _, listener := range p.syncingListeners {
      // Core method
      listener.add(obj)
    }
  } else {
    for _, listener := range p.listeners {
      listener.add(obj)
    }
  }
}

// Listener. add method, where events are added to the addCh channel of the listener
// At this point, the previous question is also answered - who produces data in P.ddch
func (p *processorListener) add(notification interface{}) {
  // Objects of different update types are added to the channel
  // used by the Run method of processorListener
  p.addCh <- notification
}
Copy the code

Resync mechanism

Three steps in the ListAndWatch method:

  • List
  • Rsync
  • Watch

RSync is responsible for synchronizing resource objects stored locally in Indexer into DeltaFIFO and setting the resource type to Sync. Run this periodically in Reflector

func (f *DeltaFIFO) Resync(a) error {
  // Get the Indexer local storage object
  keys := f.knownObjects.ListKeys()
  for _, k := range keys {
    iferr := f.syncKeyLocked(k); err ! =nil {
      return err
    }
  }
  return nil
}
Copy the code

Indexer

As mentioned in the previous analysis, resource changes are saved to the local Indexer, which is described here.

  • Indexer is a local storage that client-Go uses to store resource objects and has its own index function
  • Reflector stores the resource objects consumed from DeltaFIFO into Indexer
  • Indexer data is consistent with Etcd, and client-Go can be easily read locally, reducing the pressure on apI-Server

Four important data structures

IO /client-go/tools/cache/index.go

// Store cached data
// type Empty struct{}
// type String map[string]Empty
// Sets.String uses map to simulate sets. Values in map are empty structures
type Index map[string]sets.String

// The indexer function receives the resource object and returns a list of search results
type IndexFunc func(obj interface{}) ([]string, error)

// Store indexers. Key is the indexer name and value is the indexer implementation function
type Indexers map[string]IndexFunc

// Store the cache. Key is the name of the cache and value is the cache data
type Indices map[string]Index
Copy the code

ThreadSafeMap

Indexer encapsulates based on ThreadSafeMap, so let’s look at ThreadSafeMap first

  • ThreadSafeMap is an in-memory store and data is not stored to disk
  • Add, delete, change and check will be locked to ensure data consistency
  • Internal use of indexer, cache

Source location: k8s. IO/tools/cache/thread_safe_store. Go

type threadSafeMap struct {
  lock  sync.RWMutex
  // The map structure stores resource data
  // The map key is computed by the keyFunc function. By default, MetaNamespaceFunc is used
  // This function computes the key in the form 
      
       /
       
         based on the resource object
       
      
  // Value is a Delta Object, including Type and Object resource objects
  items map[string]interface{}
  // Store indexers. Key is the indexer name and value is the indexer implementation function
  indexers Indexers
  // Store the cache. Key is the cache name and value is the cached resource object data
  indices Indices
}

// Get index result by executing indexer function
// Two parameters are required: the indexer name and the key to retrieve
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
  // Find the specified indexer function
  indexFunc := c.indexers[indexName]
  if indexFunc == nil {
    return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  }
  // Find the specified cache function
  index := c.indices[indexName]

  // Find and return data from cached data
  set := index[indexedValue]
  list := make([]interface{}, 0, set.Len())
  for key := range set {
    list = append(list, c.items[key])
  }

  return list, nil
}
Copy the code

conclusion

The Informer mechanic plays an important role in K8S, and its source code is very complex. In the process of learning, you must match the picture at the beginning of the article, otherwise it will be easy to get around it. Queues and channels are used to decouple components. One of my personal observations is that the logic of analysis may be clearer by centering on one core idea, that is, who puts data into channel and who takes data from channel.