Recently joined the cloud native community organization K8S source code institute, began to learn k8S source code, and sorted out notes. Welcome interested students to join us and make progress together. There are all kinds of bigwigs in the group and community ready to answer your questions. Github.com/cloudnative…
The importance of Informer in K8S is no longer to be discussed, but to get straight to the point.
Let’s start with an invocation diagram
Hd address
As the source code of Informer is relatively complex and the call link is very long, the subsequent source code analysis will be based on this figure.
An overview of the
In K8S, components communicate with each other through HTTP. In the case of not relying on any middleware, the reliability, real-time and sequential of messages need to be guaranteed. How does the K8S do it? — Informer. The other components of K8S communicate with the API-Server through the Informer.
Working principles of Informer
The components include:
- Reflector: Used to monitor the specified resource. When the monitored resource changes, a change event is triggered. The resource object is stored in the local cache DeltaFIFO
- DeltaFIFO: Performs basic operations on queues for the operation types of resource objects
- FIFO: FIFO queue provides operations such as adding, deleting, modifying, and querying resource objects
- Dealta: Resource object store, which stores operation types of resource objects. For example, add operation type, update operation type, delete operation type, synchronize operation type
- Indexer: local storage that stores resource objects and has the index function.
- Reflect stores consumed resource objects from DeltaFIFO to Indexer
- The data in Indexer is exactly the same as Etcd, and client-Go can read it locally, taking the pressure off Etcd and API-Server
Example of Informer
- Create a Clientset object with kubernetes.newforconfig. The Informer needs to interact with apiserver through clientset
- Create a stop channel to tell the Informer to exit before the process exits. Because Informer is a persistent Groutine
- Informers. SharedInformer NewSharedInformerFactory instantiated object
- The first parameter is ClientSet
- The second parameter is how often to synchronize
- The Informer method gets the Informer object for a particular resource
- The AddEventHandler function can add callback methods to objects, supporting three types of object callback methods
- AddFunc: Callback method triggered when the resource object is created
- UpdateFunc: Callback method triggered when updating a resource object
- DeleteFunc: Callback method triggered when deleting a resource object
- The Run method runs the current Informer
// Use the informer mechanism to monitor K8S resources
func informer(a) {
// Since informer is a persistent groutine, channel is used to tell informer to quit before the process exits
stopChan := make(chan struct{})
defer close(stopChan)
// Create a client object to connect to K8s
clientSet, err := kubernetes.NewForConfig(config)
iferr ! =nil {
log.Printf("init clientset error.")
return
}
// Step 1: Create the sharedInformer object. The second parameter is the interval for resynchronizing data
sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
// Step 2: Each resource has an informer object. Here we get the pod resource's informer object
podInformer := sharedInformers.Core().V1().Pods().Informer()
// Step 3: Add custom callback functions
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// Add a resource callback function that returns the interface type and needs to be cast to the real type
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New pod added: %s", mObj.GetName())
},
// Update the resource callback function
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
},
// Delete the resource callback function
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("pod deleted from store: %s", mObj.GetName())
},
})
// Step 4: Start running the Informer object
podInformer.Run(stopChan)
}
Copy the code
Resources Informer and SharedInformer
In the previous demo, the first step was to create a SharedInformer object. Now let’s look at Informer and SharedInformer
Resources Informer
- Each of these resources implements the Informer mechanism, allowing different resource events to be monitored
- Every Informer implements the Informer and Lister methods
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
Copy the code
SharedInformer
If the Informer of the same resource is instantiated multiple times, and each Informer uses a Reflector, then too many of the same ListAndWatch will run, and too many repeated serialization and deserialization operations will cause the apI-server to be overloaded
SharedInformer enables Informer of the same class to share a Reflector. Internally, a map field is defined to hold all Infromer fields.
Demo in front of the first step in creating SharedInformer sharedInformers: = informers. NewSharedInformerFactory (clientSet, time. The Minute), We’re internally initializing a sharedInformerFactory object, so let’s look at sharedInformerFactory
Vendor /k8s. IO /client-go/informer/factory.go
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
// Store shared informer by type
informers map[reflect.Type]cache.SharedIndexInformer
// This field is used to track if Informers is started
// The Start() method can be safely called multiple times (idempotent)
startedInformers map[reflect.Type]bool
}
Copy the code
The Start method
The controller-Manager component in K8S, the Run method in the source code calls the SharedInformerFactory Start method
Source location: CMD/kube – controller – manager/app/controllermanager. Go
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error{... controllerContext.InformerFactory.Start(controllerContext.Stop) ... }Copy the code
IO /client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}){...// Iterate through all informers
for informerType, informer := range f.informers {
if! f.startedInformers[informerType] {// Each informer starts a coroutine and runs the Run method
go informer.Run(stopCh)
f.startedInformers[informerType] = true}}}Copy the code
Get Informer
In the previous demo, the second step is to call podInformer := sharedInformer.core ().v1 ().Pods().informer (), and get the reformer instance. Let’s start with the Informer method
The key logic includes:
- Initialization of sharedProcessor
- Register the List and Watch methods: Register the List and Watch methods for a specific resource type
- Initialization of Indexer: Implementation class is cache class
Pod, for example, source location: client – go/informers/core/v1 / pod. Go
// Get the pod informer, internally call InformerFor, and pass the argument to f.de FaultinFormer
func (f *podInformer) Informer(a) cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
// Initializes indexers with the last argument
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// Register the List, Watch methods
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
iftweakListOptions ! =nil {
tweakListOptions(&options)
}
// The List method is the List method of the resource object (in this case pod).
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
iftweakListOptions ! =nil {
tweakListOptions(&options)
}
// The Watch method is the Watch method of the resource object.
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// This is the processor initialization
processor: &sharedProcessor{clock: realClock},
// The interface is Indexer and the implementation class is cache
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// Index interface, implementation class is the cache class
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
Copy the code
Register custom callback functions
After obtaining the Informer object, the third step is to register the infomer custom callback function to implement its own business logic when k8S resources send changes. The following analyze podInformer. AddEventHandler (cache. ResourceEventHandlerFuncs {… The logic of})
// Start registering event handlers
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration){...// Each listener is registered as a Listner instance
// Each listener holds a handler object. The framework calls the handler method when a subsequent event occurs, which leads to the code logic for the user registration
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
...
// Add listner to sharedProcessor
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
// Add listner to sharedProcessor
func (p *sharedProcessor) addListener(listener *processorListener){...if p.listenersStarted {
// The listener starts two key coroutines in the background, which will be described later
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
Copy the code
Run method
In the previous demo, after the informer instance is obtained, the last step is to call the Run method. Now we start parsing the logic of the Run method. The core logic includes:
- Initialization of DeltaFIFO
- Initialization of Controller
- Run the process.run method
- Run the controller.run method
IO /client-go/tools/cache/shared_informer
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// Initialize DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,})// config initialization
// Focus here on the ListerWatcher object and the Process object, which is associated with the HandleDeltas function
// HandleDeltas is the core method for consuming incremental information (Delta objects)
cfg := &Config{
Queue: fifo,
/ / ListAndWatch object
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// Register the callback function HandleDeltas to store resource changes to the local Indexer
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
// This is mainly the initialization of the controller
func(a) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// Initialize the Controller object
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true} ()/ / s.c acheMutationDetector. Run to check the cache object exists
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// Execute the sharedProcessor.run method
// This method is very important
wg.StartWithChannel(processorStopCh, s.processor.run)
...
// Call the Controller's Run method
s.controller.Run(stopCh)
}
Copy the code
The Run method of process
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func(a){...// All listners of sharedProcessor start two coroutines per background
// Point to the run and pop methods respectively
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
...
}
Copy the code
The run method of processorListener
The processorListener represents a consumer object that executes periodically, fetching incremental information from apI-Server from its Own nextCh channel, and then calling the handler method that was passed in by the user custom earlier.
All we need to know here is that the Run method is the consumer method and is responsible for consuming events. We’ll see who is the producer and who is putting incremental information into processorLister’s nextCh channel (actually the pop method below)
func (p *processorListener) run(a) {
stopCh := make(chan struct{})
wait.Until(func(a) {
// The consumer method, which constantly gets events from the channel
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
// Call handler's method,
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
// the OnUpdate method internally calls the UpdateFunc method registered in demo
// Other methods are similar
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
ifr.UpdateFunc ! =nil {
r.UpdateFunc(oldObj, newObj)
}
}
Copy the code
Pop method of processorListener
The implementation of this method is quite complex, but the overall purpose is simple: to produce messages into nextCh that can then be consumed by the run method described earlier
Instead of fetching one event at a time, we use a buffer-like approach
func (p *processorListener) pop(a){...var nextCh chan<- interface{}
var notification interface{}
for {
select {
// The first time this function comes in, the notification must be empty and the case will be blocked
// After the second case call is completed, the Notification is assigned to notificationToAdd
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if! ok {// Nothing to pop
nextCh = nil // Disable this select case
}
// The first time you call it, it goes in here and gets the data from addCh first (we'll see who puts it in addCh later)
case notificationToAdd, ok := <-p.addCh:
if! ok {return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
notification = notificationToAdd
// channel is a reference type that points p.nextch to nextCh, and the operation on nextCh is p.nextch
// This also answers the question of who is the producer of nextCh and puts data into nextCh
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
Copy the code
In the pop function, we see that the main consumption of P.ddch is the producer of p.ddch. The watch function detects an apI-server event and triggers the HandlerDelta function, which updates Indexer and calls the distribute method to notify all listeners of the event. Add data to the addCh channel of each listener.
Controller’s Run method
The key logic inside the Run method includes:
- Initialize the Reflector object
- Call Reflector’s Run method
- Call List to retrieve all resource data
- Call Watch to monitor resource changes in real time and queue them
- Call the controller’s processLoop method
- Consume the data in the queue
Source location: k8s. IO/client – go/tools/cache/controller. Go
func (c *controller) Run(stopCh <-chan struct{}) {
// Call NewReflector to initialize a Reflector
// The ListWatcher data interface object must be passed in
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// Call Reflector's Run method to start monitoring and process monitoring events
wg.StartWithChannel(stopCh, r.Run)
// processLoop is responsible for fetching data from DeltaFIFO and consuming it
wait.Until(c.processLoop, time.Second, stopCh)
}
Copy the code
Reflector
Pause the source code analysis and take a quick look at Reflector, which is used to monitor specified K8S resources and trigger corresponding change events.
- NewReflector: To create a Reflector object, pass in the ListerWatcher data interface object
- Run: starts monitoring and processes events
The core function in Run is ListAndWatch, and the flow includes:
- Get resource list data
- Monitor resource objects: block transfer encoding using the HTTP protocol
Reflector class
type Reflector struct {
/ / name
name string
expectedTypeName string
// The type of resource expected to be placed in the cache store
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
DeltaFIFO class = DeltaFIFO class
store Store
// The object used to execute List and Watch
listerWatcher ListerWatcher
backoffManager wait.BackoffManager
/ / resync cycle
resyncPeriod time.Duration
ShouldResync func(a) bool
clock clock.Clock
paginatedResult bool
// The last version of the resource seen
lastSyncResourceVersion string
isLastSyncResourceVersionUnavailable bool
lastSyncResourceVersionMutex sync.RWMutex
WatchListPageSize int64
watchErrorHandler WatchErrorHandler
}
Copy the code
Core method: Run
Core logic includes:
- Call the List method to get all the data under the resource object
- Transforms the resource data into a list of resource objects
- The resource information is stored in DeltaFIFO, replacing the local cache in full
- Call the Watch method to listen for resources
- Call the watchHandler function to handle the various events that watch receives
/ / Run function
func (r *Reflector) Run(stopCh <-chan struct{}){... wait.BackoffUntil(func(a) {
// Core function: ListAndWatch
iferr := r.ListAndWatch(stopCh); err ! =nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
...
}
Copy the code
ListAndWatch method
/ / ListAndWatch function
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error{...if err := func(a) error{...go func(a){... pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// Call the List method to get all the data under the resource object
return r.listerWatcher.List(opts)
}))
...
}()
...
// Get the version number of the resource
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// Convert the resource data to a list of resource objects
items, err := meta.ExtractList(list)
// Store the resource information in DeltaFIFO, replacing the local cache in full
// Call the replace method internally
iferr := r.syncWith(items, resourceVersion); err ! =nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
// Set the latest version of the resource
r.setLastSyncResourceVersion(resourceVersion)
return nil} (); err ! =nil {
return err
}
go func(a){...for{...// Synchronize resources
if r.ShouldResync == nil || r.ShouldResync() {
// Call DeltaFIFO's Resync method
iferr := r.store.Resync(); err ! =nil{... } } resyncCh, cleanup = r.resyncChan() } }()for{...// Listen for resources
w, err := r.listerWatcher.Watch(options)
// Handle handler events, Add, Delete, Update functions registered by users
iferr := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); . }}/ / syncWith function
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0.len(items))
for _, item := range items {
found = append(found, item)
}
/ / call the cache. The Replace
return r.store.Replace(found, resourceVersion)
}
// cache.Replace
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := c.keyFunc(item)
iferr ! =nil {
return KeyError{item, err}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
return nil
}
Copy the code
WatchHandler method
The watchHandler function handles all events received by the Watch. All events are stored in ResultChan, including event types and resource objects
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error{...for {
select{...// Get the channel of the event in the watch interface
case event, ok := <-w.ResultChan():
...
switch event.Type {
// handle the Add function
case watch.Added:
// Store is DeltaFIFO class
err := r.store.Add(event.Object)
// Process the Modified function
case watch.Modified:
err := r.store.Update(event.Object)
// Process the Deleted function
case watch.Deleted:
err := r.store.Delete(event.Object)
}
*resourceVersion = newResourceVersion
// Set the resource version
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}
Copy the code
DeltaFIFO
Behind the source code analysis will use DeltaFIFO, here first introduced.
DeltaFIFO is used to Store various events returned by the Watch API. The same resource object with different operation types will exist in the Queue. DeltaFIFO implements the Queue interface, which inherits the Store interface
Source path: vendor/k8s. IO /client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// Map structure storage: key is the key of the resource object, value is the Deltas array of the object
items map[string]Deltas
// The key of the storage resource object
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// Index Stores the object locally
knownObjects KeyListerGetter
closed bool
closedLock sync.Mutex
}
Copy the code
Core functions include:
- Producer method
- Consumer approach
- Resync mechanism
Producer method
After Reflector listens to resource changes, it adds resource changes such as Add, Delete, and Update to the DeltaFIFO. That’s the producer of the queue, and it’s called internally
The entry function is r.tore.add (Event.object), described earlier in watchHandler
func (f *DeltaFIFO) Add(obj interface{}) error{...return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error{...return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error{...return f.queueActionLocked(Deleted, obj)
}
// For example, one handler, the others are similar, and all call queueActionLocked internally
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// Call queueActionLocked internally
return f.queueActionLocked(Updated, obj)
}
Copy the code
QueueActionLocked method
// Encapsulates Delta events and queues them for consumption (HandleDeltas)
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// According to the resource object, get the key, usually in namespace/name format
id, err := f.KeyOf(obj)
// Encapsulate the watch event type and resource object into a Delta object
newDeltas := append(f.items[id], Delta{actionType, obj})
// delete operation
newDeltas = dedupDeltas(newDeltas)
// Add the Delta object to the queue
if len(newDeltas) > 0 {
if_, exists := f.items[id]; ! exists { f.queue =append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}
Copy the code
Consumer method -processLoop
After analyzing Reflector’s Run method, the next step is the Controller’s processLoop method
func (c *controller) Run(stopCh <-chan struct{}) {
// Call NewReflector to initialize a Reflector
// The ListWatcher data interface object must be passed in
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// Call Reflector's Run method to start monitoring and process monitoring events
wg.StartWithChannel(stopCh, r.Run)
// processLoop is responsible for fetching data from DeltaFIFO and consuming it
wait.Until(c.processLoop, time.Second, stopCh)
}
Copy the code
func (c *controller) processLoop(a) {
for {
// Fetch the data from the DeltaFIFO queue and hand it to Process
// The process function is stored in config. process, which is the HandleDeltas passed in earlierobj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) ... }}Copy the code
The Pop method
The DeltaFIFO consumption method is Pop, which requires passing in the process function to receive and process the object’s callback method
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
for {
for len(f.queue) == 0 {
// When the queue is empty, the Pop function blocks until new data is queued
// If Close is called, the closed state is set and broadcast
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// There is data in the queue
id := f.queue[0]...// Pass the data to the upper callback function for processing
err := process(item)
// If an error occurs, the data is put back into the queue
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
Copy the code
Process function: HandleDeltas
- The Run method passes in a callback called HandleDeltas
- The POP object executed inside the processLoop is the HandleDeltas passed in above
Core logic includes:
- Updating the local cache cacheStorage is essentially updating the threadSafeMap data structure
- Notifying all listeners of events is simply adding data to the Listener addCh for consumption by the consumer
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// Get all Delta resources
for _, d := range obj.(Deltas) {
// Determine the resource type
switch d.Type {
// If the following type is used, store the resource in Indexer
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// The implementation class of Indexer is the cache described earlier
iferr := s.indexer.Update(d.Object); err ! =nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// Distribute the resource object to SharedInformer's event handler
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
iferr := s.indexer.Add(d.Object); err ! =nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)}case Deleted:
iferr := s.indexer.Delete(d.Object); err ! =nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
// Update the local cache cacheStorage
// Update the threadSafeMap data structure. The initialization of threadSafeMap is described earlier
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
iferr ! =nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
/ / distribute function
func (p *sharedProcessor) distribute(obj interface{}, sync bool){...if sync {
for _, listener := range p.syncingListeners {
// Core method
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// Listener. add method, where events are added to the addCh channel of the listener
// At this point, the previous question is also answered - who produces data in P.ddch
func (p *processorListener) add(notification interface{}) {
// Objects of different update types are added to the channel
// used by the Run method of processorListener
p.addCh <- notification
}
Copy the code
Resync mechanism
Three steps in the ListAndWatch method:
- List
- Rsync
- Watch
RSync is responsible for synchronizing resource objects stored locally in Indexer into DeltaFIFO and setting the resource type to Sync. Run this periodically in Reflector
func (f *DeltaFIFO) Resync(a) error {
// Get the Indexer local storage object
keys := f.knownObjects.ListKeys()
for _, k := range keys {
iferr := f.syncKeyLocked(k); err ! =nil {
return err
}
}
return nil
}
Copy the code
Indexer
As mentioned in the previous analysis, resource changes are saved to the local Indexer, which is described here.
- Indexer is a local storage that client-Go uses to store resource objects and has its own index function
- Reflector stores the resource objects consumed from DeltaFIFO into Indexer
- Indexer data is consistent with Etcd, and client-Go can be easily read locally, reducing the pressure on apI-Server
Four important data structures
IO /client-go/tools/cache/index.go
// Store cached data
// type Empty struct{}
// type String map[string]Empty
// Sets.String uses map to simulate sets. Values in map are empty structures
type Index map[string]sets.String
// The indexer function receives the resource object and returns a list of search results
type IndexFunc func(obj interface{}) ([]string, error)
// Store indexers. Key is the indexer name and value is the indexer implementation function
type Indexers map[string]IndexFunc
// Store the cache. Key is the name of the cache and value is the cache data
type Indices map[string]Index
Copy the code
ThreadSafeMap
Indexer encapsulates based on ThreadSafeMap, so let’s look at ThreadSafeMap first
- ThreadSafeMap is an in-memory store and data is not stored to disk
- Add, delete, change and check will be locked to ensure data consistency
- Internal use of indexer, cache
Source location: k8s. IO/tools/cache/thread_safe_store. Go
type threadSafeMap struct {
lock sync.RWMutex
// The map structure stores resource data
// The map key is computed by the keyFunc function. By default, MetaNamespaceFunc is used
// This function computes the key in the form
/
based on the resource object
// Value is a Delta Object, including Type and Object resource objects
items map[string]interface{}
// Store indexers. Key is the indexer name and value is the indexer implementation function
indexers Indexers
// Store the cache. Key is the cache name and value is the cached resource object data
indices Indices
}
// Get index result by executing indexer function
// Two parameters are required: the indexer name and the key to retrieve
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
// Find the specified indexer function
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// Find the specified cache function
index := c.indices[indexName]
// Find and return data from cached data
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
Copy the code
conclusion
The Informer mechanic plays an important role in K8S, and its source code is very complex. In the process of learning, you must match the picture at the beginning of the article, otherwise it will be easy to get around it. Queues and channels are used to decouple components. One of my personal observations is that the logic of analysis may be clearer by centering on one core idea, that is, who puts data into channel and who takes data from channel.