This article is shared from huawei cloud community “Client-go source code analysis of DeltaFIFO”, author: Kaliarch.

As you can see in the ListAndWatch method, the full List of resources is actually called by the Replace method in the Store that Reflector passed in. Call Store Resync method, so how to achieve it, this article will be from the source analysis.

DeltaFIFO is a Queue used to store processing data in K8s. Compared with traditional FIFO, it not only stores data to ensure first-in, first-out, but also stores the types of K8s resource objects. It is an important channel between Reflector(producer) and Indexer (consumer).

1, the source

Here we focus on a few more important methods and functions, understand DeltaFIFO important several attributes and methods, specific finer Queue, FIFO and other methods need to view more detailed source code.

1.1 DeltaFIFO structure

Delta struct {type DeltaType Object interface{}} type DeltaType string // Change type definition Const (Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" When you have to relist, the replacement is triggered. // We don't know if the replaced object has changed. Replace DeltaType = "press" // Sync Is used for synthesis events during periodic resynchronization Sync DeltaType = "Sync") type DeltaFIFO struct {// lock/cond RWMutex // cond implements a condition variable, a collection point, for waiting for or announcing goroutine events to occur. // Each Cond has a lock L (usually * Mutex or * RWMutex), and a condition change must be reserved when the Wait method is called. Conditions must not be duplicated after first use. Cond sync.Cond // 'items' maps a key to a Deltas. // Items are changes to the same type of object delta change list items map[string]Deltas //' queue ' -- Maintains FIFO Order of keys for consumption in Pop(). // There are no duplicates in 'queue'. // To preserve the order queue []string. -- Maintains FIFO Order of keys for consumption in Pop() // If the call to Replace() completes the first fill, InitialPopulationCount int // Calculates the key of the item keyFunc keyFunc // Indexer knownObjects KeyListerGetter Closed bool emitDeltaTypeReplaced is whether to emit the press or Sync  DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool }Copy the code

The source code shows that there are five types of deltas. The previous additions, deletions and changes are used to monitor object changes in The Watch as the name implies. For initial and exceptional cases, the changes Replaced and Sync are used to ensure that data in indexer is consistent with that in ETCD.

We can see several important properties in DeltaFIFO.

  • Queue: key of the storage resource object.
  • Items: Stores a class of behavior for an object, a sequence of changes to the same object, with key being the value computed by keyFunc and value being the list of deltas.
  • KeyFunc: The key used to calculate items.

DeltaFIFO stores the operation type of resource object obj. The same resource object of different operation types exists in queues. The key of queue is calculated by Keyof, the items field is stored by map data structure, and valuse stores the Deltas array of objects.

For example, when a user creates a Pod, the Delat is said to be a Pod with Added type. The Added type enables the controller to perform different business logic for subsequent actions.

1.2 queueActionLocked

We examine DeltaFIFO methods, such as Add/Update/Delete, which call queueActionLocked.

Func (f *DeltaFIFO) Add(obj interface{}) error {f.lock.lock () defer f.lock.unlock () f.opulated = true return f.queueActionLocked(Added, obj) }Copy the code
  • queueActionLocked

The general steps are as follows: obtain the object key, add the new object to the object list, delete the delete type, store its key to the queue if the object is not in the queue, add the object to items and update queue.

Func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { err := f.KeyOf(obj) if err ! = nil { return KeyError{obj, Err}} // Temporarily store the old object list oldDeltas := F.i. Tems [id] // Add the object to the new object list newDeltas := append(oldDeltas, Delta{actionType, Obj}) // Delete the delete type because the update type may update a field incorrectly. NewDeltas = dedupDeltas(newDeltas) If len(newDeltas) > 0 {if _, exists := F.items [id]; ! exists { f.queue = append(f.queue, Id)} f.i. S [id] = newDeltas // Notify all consumers to unblock. because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). // If somehow it happens  anyway, deal with it but complain. if oldDeltas == nil { klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) return nil } klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, Obj) // Generate the final object items F.i. Tems [id] = newDeltas return FMT.Errorf("Impossible dedupDeltas for ID =%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) } return nil }Copy the code
  • KeyOf

    To get the latest DeltaFIFO object key in queueActionLocked, the KeyOf method is used. First, it reads whether the DeltaFIFO object is a delta slice, and then performs the operation with the latest delta object. The default is MetaNamespaceKeyFunc, which returns /, unless empty, using the object name as the key.

Func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { ok := obj.(Deltas); ok { if len(d) == 0 { return "", KeyError{obj, ErrZeroLengthDeltasObject}} / / calculated using the latest version of the Object obj = d.N ewest () Object} if d, ok: = obj. (DeletedFinalStateUnknown); Ok {return d.key, nil} // How to calculate how to initialize the KeyFunc function passed in DeltaFIFO return f.keyfunc (obj)} If it doesn't return nil. func (d Deltas) Newest() *Delta { if n := len(d); n > 0 { return &d[n-1] } return nil }Copy the code

1.3 the Replace

In the previous Reflector tutorial, you can see that in the ListAndWatch method, the full List of resources ends up calling the Replace method in the Store that Reflector passed in.

The Replace method is mainly used for the full update of the object. Since DeltaFIFO output is the incremental change of all targets, each full update must determine whether the object has been deleted, because the target deletion request may not be received before the full update. This is different from cache, where Replace() is a rebuild, because the cache is a memory map of the full object, so Replace() is a rebuild.

func (f *DeltaFIFO) Replace(list []interface{}, Error {f.lock.lock () defer f.lock.unlock () // create set keys := make(set.string, Len (list)); Action := Sync if f. mitDeltaTypeReplaced {action = press} // Iterate over the incoming object column slice and add it to the DeltaFIFO. For _, item := range list {// Get object key key, err := f.keyof (item) if err! Key. Insert(key) if err := f.ueueActionLocked (action, item); err ! = nil { return fmt.Errorf("couldn't enqueue object: %v", err)}} // Check whether there is an Indexer stored, if there is no Indexer, then maintain its own Queue // If the old object is not in the Queue, delete the object, If f.nownobjects == nil {// Do deletion detection against our own list. queuedDeletions := 0 // Execute update for on the object  k, oldItem := range f.items { if keys.Has(k) { continue } // Delete pre-existing items not in the new list. // This could happen if watch deletion event was missed while // disconnected from apiserver. var deletedObj interface{} if n := oldItem.Newest(); n ! QueuedDeletions++ = nil {deletedObj = n.object} queuedDeletions++ Therefore, use DeletedFinalStateUnknown if err := F.cueActionLocked (Deleted, DeletedFinalStateUnknown{k, deletedObj}); err ! Populated = nil {return err}} // If populated is false, it means that the first queue object is done if! F.populated {f.populated = true f.i. NitialPopulationCount = keys.len () + queuedDeletions} return nil} KnownKeys := f.knownobjects.listkeys () queuedDeletions := 0 for _, K := range knownKeys {if keys.Has(k) {continue} // deletedObj, exists, err := f.knownObjects.GetByKey(k) if err ! = nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if ! exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", QueuedDeletions++ if err := f.cueueactionlocked (Deleted, DeletedFinalStateUnknown{k, deletedObj}); err ! = nil { return err } } if ! f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions } return nil }Copy the code

1.4 Resync

In timed synchronization, the Store Resync method is called. Resync resynchronizes with a Delta object of type Sync. If f.nownobjects (Indexer) does not exist, Resync is not performed.

Func (f *DeltaFIFO) Resync() error {f.lock.lock () defer f.lock.unlock () // If there is no indexer, If f.nownobjects == nil {return nil} // get indexer key ListKeys := f.nownobjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err ! Func (f *DeltaFIFO) syncKeyLocked(key string) error {// Get key obj, Exists, err := f.knownobjects. GetByKey(key) // If err! = nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if ! exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } // If we are doing Resync() and there is already an event queued for that object, // we ignore the Resync for it. This is to avoid the race, in which the resync // comes with the previous value of object (since queueing an event for the object // doesn't Trigger changing the underlying store <knownObjects>. // Obtain the key ID from KeyOf, err := f.keyof (obj) if err! = nil {return KeyError{obj, err}} If len(F.items [id]) > 0 {return nil} // Add the Delta to the object synchronization if err := f.cueueActionLocked (Sync, obj); err ! = nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil }Copy the code

1.5 Pop

Finally, we’ll look at the consumption of objects in DeltaFIFO, which actually uses the Pop function. The processing of the data flow is done through PopProcessFunc. Pop waits until an element is ready to process, and if more than one element is ready, returns in the order in which they were added or updated. Before processing, the element is removed from the queue (and storage), so if it is not processed successfully, it should be added back with the AddIfNotPresent() function.

Handlers are called when there is a lock, so it is safe to update the data structures that need to be synchronized with the queue.

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, Error) {f.lock.lock () defer f.lock.unlock () for {for len(f.cueue) == 0 {// When the queue is empty, the Pop() call will be blocked, Until the new element is inserted // When Close() is called, set f.closed and broadcast the condition. if f.closed { return nil, ErrFIFOClosed} f.ond.Wait()} // get the first element to process := f.cueue [0] // Remove the first element from the queue f.cueue = f.cueue [1:] if F.itialpopulationcount > 0 {f.itialPopulationCount --} // Get the popup item, ok := f.items[id] if! ok { // This should never happen klog.Errorf("Inconceivable! %q was in f.queue but not f.items; Ignoring.", id) continue} // Delete (f.items, id) err := process(item) if e, ok := err.(ErrRequeue); Ok {// If processing failed, AddIfNotPresent (id, item) err = e.err} // Don't need to copyDeltas here because we're transferring // ownership to the caller. return item, err } }Copy the code

2. Test the cat

package main import ( "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "path/filepath" "time" ) func Must(e interface{}) { if e ! = nil { panic(e) } } func InitClientSet() (*kubernetes.Clientset, error) { kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config") restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err ! = nil { return nil, Err} return kubernetes.newforConfig (restConfig)} // Generate listwatcher func InitListerWatcher(clientSet) *kubernetes.Clientset, resource, namespace string, fieldSelector fields.Selector) cache.ListerWatcher { restClient := clientSet.CoreV1().RESTClient() return cache.NewListWatchFromClient(restClient, resource, namespace, // Generate Pods reflector func InitPodsReflector(clientSet * kubernetes.clientset, store cache.Store) *cache.Reflector { resource := "pods" namespace := "default" resyncPeriod := 0 * time.Second expectedType := &corev1.Pod{} lw := InitListerWatcher(clientSet, resource, namespace, fields.Everything()) return cache.NewReflector(lw, expectedType, store, ResyncPeriod)} DeltaFIFO func InitDeltaQueue(store cache.store) cache.Queue {return Cache. NewDeltaFIFOWithOptions (cache. DeltaFIFOOptions {/ / store implements KeyListerGetter KnownObjects: Store, // EmitDeltaTypeReplaced indicates that the queue consumers understand the value of the replacement DeltaType. // Calls to Replace() are handled the same as Sync() until the 'press' event type is added. // For backward compatibility purposes, false by default. // When true, a "Replace" event is sent for the item passed to the Replace() call. When false, the 'Sync' event is sent. EmitDeltaTypeReplaced: true, }) } func InitStore() cache.Store { return cache.NewStore(cache.MetaNamespaceKeyFunc) } func main() { clientSet, Err := InitClientSet() Must(err) // Used to fetch store in processFunc := InitStore() // queue DeleteFIFOQueue := PodReflector := InitPodsReflector(clientSet, DeleteFIFOQueue := make(chan struct{}) defer close(stop) go podReflector.Run(stop) Element ke is namespace/name, ProcessFunc := func(obj interface{}) error {// The event received first is processed first for _, d := range obj.(cache.Deltas) { switch d.Type { case cache.Sync, cache.Replaced, cache.Added, cache.Updated: if _, exists, err := store.Get(d.Object); err == nil && exists { if err := store.Update(d.Object); err ! = nil { return err } } else { if err := store.Add(d.Object); err ! = nil { return err } } case cache.Deleted: if err := store.Delete(d.Object); err ! = nil { return err } } pods, ok := d.Object.(*corev1.Pod) if ! ok { return fmt.Errorf("not config: %T", d.Object) } fmt.Printf("Type:%s: Name:%s\n", d.Type, pods.Name) } return nil } fmt.Println("Start syncing..." ) wait.Until(func() { for { _, err := DeleteFIFOQueue.Pop(ProcessFunc) Must(err) } }, time.Second, stopCh) }Copy the code

First create store, create DeltaFIFO, initialize Reflector, insert DeltaFIFO as store, and ListWatch K8s APIserver after Reflector runs. Store the List’s data in DeltaFIFO, and customize ProcessFunc to process the elements of DeltaFIFO.

3. Process summary

Reflector uses ListAndWatch to get the full amount of resource object data and then calls DeltaFIFO’s Replace() method to insert the full amount into the queue. If timed synchronization is set, the Indexer will be updated periodically. The Add, Update, and Delete methods of DeltaFIFO are subsequently called according to the operation type of the resource object through the Watch operation. What to do with Pop elements depends on the Pop callback function PopProcessFunc.

Click to follow, the first time to learn about Huawei cloud fresh technology ~