This article is shared with Huawei Cloud community “Client-Go Source Code Analysis Of Reflector” by Kaliarch.

A background

Reflector is the core component to ensure the reliability of Informer. There are a lot of details that need to be considered in the event loss, event receipt, event processing failure, etc. A separate Listwatcher lacks reconnection and resynchronization mechanisms, and data inconsistencies can occur. Its response to events is synchronous, and queues need to be introduced if complex operations cause blocking.

The Reflector

A Reflector can become a Reflector that reflects data from the etcd into storage (DeltaFIFO). Reflector obtains all resource object data through its internal List operation and saves the data to the local storage. Then Watch monitors resource changes and triggers corresponding events, such as Add, Update, and Delete.

Reflector structure is defined in the staging/SRC/k8s. IO/client – go/tools/cache/Reflector. Go below:

/ / k8s. IO/client - go/tools/cache/reflector. Go type reflector struct {/ / the name of the name identifies the reflector, By default file: number of rows (125) such as reflector. Go: / / the default name by k8s IO/apimachinery/PKG/util/naming/from_stack. Go GetNameFromCallsite function generated below Name String // The name of the type expectedGVK expect to be placed in the Store, if provided, is the string format of the expectedGVK, or the string of the expectedType, which is only used for display and not for parsing or comparison. expectedTypeName string // An example object of the type we expect to place in the store. // Only the type needs to be right, except that when that is // `unstructured.Unstructured` the object's `"apiVersion"` and // `"kind"` must also be right. ExpectedType Reflect. Type // If it is unstructured, GVK expectedGVK *schema.GroupVersionKind expectedGVK *schema.GroupVersionKind // Target Store Store Store used to execute Lists and watches Manages the backoff of ListWatch backoffManager through the methods of listerWatcher // backoff assembler Wait.BackoffManager resyncPeriod time.Duration // ShouldResync should be called periodically when true is returned, ShouldResync func() bool clock allows tests to manipulate time clock clock.Clock // paginatedResult defines whether pagination should be forced for list calls. // It is set based on the result of the Initial list call.paginatedResult bool // Kubernetes resources are versionedin APIServer. LastSyncResourceVersion is the version of lastSyncResourceVersion string // if the previous list or watch request with lastSyncResourceVersion is one Failed request for HTTP 410 (Gone), The isLastSyncResourceVersionGone to true isLastSyncResourceVersionGone bool / / lastSyncResourceVersionMutex used to ensure right LastSyncResourceVersion read/write access. lastSyncResourceVersionMutex sync.RWMutex // WatchListPageSize is the requested chunk size of initial and resync watch lists. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data // (RV="0") it will default to pager.PageSize, for the rest (RV ! = "" && RV ! = "0") // it will turn off pagination to allow serving them from watch cache. // NOTE: It should be used carefully as paginated lists are always served directly from // etcd, which is significantly less efficient and may lead to serious performance and // scalability problems. WatchListPageSize Int64} // NewReflector creates a NewReflector object that will keep the given Store in sync with the contents of the specified resource object in the server. // The reflector only puts objects with type expectedType into Store unless expectedType is nil. // If resyncPeriod is non-zero, then the reflector periodically checks ShouldResync to decide whether to call Store's Resync operation // 'ShouldResync==nil' means Resync should always be performed. // This allows you to periodically process all full and incremental objects using reflectors. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, ResyncPeriod time.Duration) *Reflector {// The default Reflector name is file:line return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...) , LW, expectedType, store, resyncPeriod)} // The same as NewNamedReflector, Func NewNamedReflector(Name string, Lw ListerWatcher, expectedType Interface {}, store Store, resyncPeriod time.Duration) *Reflector { realClock := &clock.RealClock{} r := &Reflector{ name: name, listerWatcher: lw, store: store, backoffManager: Wait. NewExponentialBackoffManager (800 * time. Millisecond, 30 * time. Second, 2 * time. The Minute, 2.0, 1.0, realClock), resyncPeriod: resyncPeriod, clock: realClock, } r.setExpectedType(expectedType) return r }Copy the code

Three processes

  • Reflector.run () calls ListAndWatch() and starts a child goroutine to execute the List. The main coroutine blocks until the List completes.
  • Meta-extractlist (list) Converts list results into runtime.Object arrays.
  • R.sincwith (items, resourceVersion) writes DeltaFIFO to Indexer for full synchronization.
  • R.resyncchan () is also executed in a subcoroutine.
  • The loop is r.listerwatcher.Watch(optiopns).
  • R.watchhandler Incrementally synchronizes Runtime. Object to Indexer.
  • List performs full synchronization only once, while Watch performs incremental synchronization continuously.

Four Reflector key method

4.1 Construction method

// NewReflector creates a NewReflector object that will keep the given Store in sync with the contents of the specified resource object in the server. // The reflector only puts objects with type expectedType into Store unless expectedType is nil. // If resyncPeriod is non-zero, then the reflector periodically checks ShouldResync to decide whether to call Store's Resync operation // 'ShouldResync==nil' means Resync should always be performed. // This allows you to periodically process all full and incremental objects using reflectors. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, ResyncPeriod time.Duration) *Reflector {// The default Reflector name is file:line return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...) , LW, expectedType, store, resyncPeriod)} // The same as NewNamedReflector, Func NewNamedReflector(Name string, Lw ListerWatcher, expectedType Interface {}, store Store, resyncPeriod time.Duration) *Reflector { realClock := &clock.RealClock{} r := &Reflector{ name: name, listerWatcher: lw, store: store, backoffManager: Wait. NewExponentialBackoffManager (800 * time. Millisecond, 30 * time. Second, 2 * time. The Minute, 2.0, 1.0, realClock), resyncPeriod: resyncPeriod, clock: realClock, Return r}} r.s etExpectedType (expectedType) / / new Indexer and reflector func NewNamespaceKeyedIndexerAndReflector (lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) { indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc}) reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) return indexer, reflector }Copy the code

4.2 Run method

// Run repeats the reflector's ListAndWatch to get all objects and subsequent deltas. When stopCh is off, Func (r *Reflector) Run(stopCh <-chan struct{}) {klog.v (2).infof ("Starting Reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err ! = nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) }Copy the code

4.3 ListWatch

  • Set pagination parameters; Execute the list method; To synchronize the list results into a DeltaFIFO queue, you call the Store Replace method.
  • Timed synchronization: Timed synchronization is run as a coroutine. A timer is used to implement periodic synchronization. The Resync operation in Store is performed.
  • Watch part logic: inside the for loop; Execute the watch function to get resultchan; Monitor the data in Resultchan and process it;
// ListAndWatch 函数首先列出所有的对象,并在调用的时候获得资源版本,然后使用该资源版本来进行 watch 操作。
// 如果 ListAndWatch 没有初始化 watch 成功就会返回错误。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
	var resourceVersion string

	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

  // 1.List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中;
	if err := func() error {
		initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
		defer initTrace.LogIfLong(10 * time.Second)
		var list runtime.Object
		var paginatedResult bool
		var err error
		listCh := make(chan struct{}, 1)
		panicCh := make(chan interface{}, 1)
		go func() {
			defer func() {
				if r := recover(); r != nil {
					panicCh <- r
				}
			}()
			// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
			// list request will return the full response.
			pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
				return r.listerWatcher.List(opts)
			}))
			switch {
			case r.WatchListPageSize != 0:
				pager.PageSize = r.WatchListPageSize
			case r.paginatedResult:
				// We got a paginated result initially. Assume this resource and server honor
				// paging requests (i.e. watch cache is probably disabled) and leave the default
				// pager size set.
			case options.ResourceVersion != "" && options.ResourceVersion != "0":
				// User didn't explicitly request pagination.
				//
				// With ResourceVersion != "", we have a possibility to list from watch cache,
				// but we do that (for ResourceVersion != "0") only if Limit is unset.
				// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
				// switch off pagination to force listing from watch cache (if enabled).
				// With the existing semantic of RV (result is at least as fresh as provided RV),
				// this is correct and doesn't lead to going back in time.
				//
				// We also don't turn off pagination for ResourceVersion="0", since watch cache
				// is ignoring Limit in that case anyway, and if watch cache is not enabled
				// we don't introduce regression.
				pager.PageSize = 0
			}

			list, paginatedResult, err = pager.List(context.Background(), options)
			if isExpiredError(err) || isTooLargeResourceVersionError(err) {
				r.setIsLastSyncResourceVersionUnavailable(true)
				// Retry immediately if the resource version used to list is unavailable.
				// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
				// continuation pages, but the pager might not be enabled, the full list might fail because the
				// resource version it is listing at is expired or the cache may not yet be synced to the provided
				// resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
				// the reflector makes forward progress.
				list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
			}
			close(listCh)
		}()
		select {
		case <-stopCh:
			return nil
		case r := <-panicCh:
			panic(r)
		case <-listCh:
		}
		if err != nil {
			return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
		}

		// We check if the list was paginated and if so set the paginatedResult based on that.
		// However, we want to do that only for the initial list (which is the only case
		// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
		// situations we may force listing directly from etcd (by setting ResourceVersion="")
		// which will return paginated result, even if watch cache is enabled. However, in
		// that case, we still want to prefer sending requests to watch cache if possible.
		//
		// Paginated result returned for request with ResourceVersion="0" mean that watch
		// cache is disabled and there are a lot of objects of a given type. In such case,
		// there is no need to prefer listing from watch cache.
		if options.ResourceVersion == "0" && paginatedResult {
			r.paginatedResult = true
		}

		r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
		initTrace.Step("Objects listed")
    // 
		listMetaInterface, err := meta.ListAccessor(list)
		if err != nil {
			return fmt.Errorf("unable to understand list result %#v: %v", list, err)
		}
    // 获取资源版本号
		resourceVersion = listMetaInterface.GetResourceVersion()
		initTrace.Step("Resource version extracted")
    // 将资源对象转换为资源列表,讲runtime.Object 对象转换为[]runtime.Object对象
		items, err := meta.ExtractList(list)
		if err != nil {
			return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
		}
		initTrace.Step("Objects extracted")
    // 将资源对象列表中的资源和版本号存储在store中
		if err := r.syncWith(items, resourceVersion); err != nil {
			return fmt.Errorf("unable to sync list result: %v", err)
		}
		initTrace.Step("SyncWith done")
		r.setLastSyncResourceVersion(resourceVersion)
		initTrace.Step("Resource version updated")
		return nil
	}(); err != nil {
		return err
	}

  // 2.定时同步:定时同步以协程的方式运行,使用定时器实现定期同步
	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	go func() {
		resyncCh, cleanup := r.resyncChan()
		defer func() {
			cleanup() // Call the last one written into cleanup
		}()
		for {
			select {
			case <-resyncCh:
			case <-stopCh:
				return
			case <-cancelCh:
				return
			}
      // 如果ShouldResync 为nil或者调用返回true,则执行Store中的Resync操作
			if r.ShouldResync == nil || r.ShouldResync() {
				klog.V(4).Infof("%s: forcing resync", r.name)
        // 将indexer的数据和deltafifo进行同步
				if err := r.store.Resync(); err != nil {
					resyncerrc <- err
					return
				}
			}
			cleanup()
			resyncCh, cleanup = r.resyncChan()
		}
	}()

  // 3.在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;
	for {
		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
		select {
		case <-stopCh:
			return nil
		default:
		}

		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		options = metav1.ListOptions{
			ResourceVersion: resourceVersion,
			// We want to avoid situations of hanging watchers. Stop any wachers that do not
			// receive any events within the timeout window.
			TimeoutSeconds: &timeoutSeconds,
			// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
			// Reflector doesn't assume bookmarks are returned at all (if the server do not support
			// watch bookmarks, it will ignore this field).
			AllowWatchBookmarks: true,
		}

		// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
		start := r.clock.Now()
		w, err := r.listerWatcher.Watch(options)
		if err != nil {
			// If this is "connection refused" error, it means that most likely apiserver is not responsive.
			// It doesn't make sense to re-list all objects because most likely we will be able to restart
			// watch where we ended.
			// If that's the case begin exponentially backing off and resend watch request.
			if utilnet.IsConnectionRefused(err) {
				<-r.initConnBackoffManager.Backoff().C()
				continue
			}
			return err
		}

		if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
					// has a semantic that it returns data at least as fresh as provided RV.
					// So first try to LIST with setting RV to resource version of last observed object.
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}
Copy the code

4.4 LastSyncResourceVersion

Gets the version of the last synchronized resource

func (r *Reflector) LastSyncResourceVersion() string {
	r.lastSyncResourceVersionMutex.RLock()
	defer r.lastSyncResourceVersionMutex.RUnlock()
	return r.lastSyncResourceVersion
}
Copy the code

4.5 resyncChan

Returns a timing channel and a cleanup function, which stops the timer. The timed resynchronization here is done using timers.

func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
	if r.resyncPeriod == 0 {
		return neverExitWatch, func() bool { return false }
	}
	// The cleanup function is required: imagine the scenario where watches
	// always fail so we end up listing frequently. Then, if we don't
	// manually stop the timer, we could end up with many timers active
	// concurrently.
	t := r.clock.NewTimer(r.resyncPeriod)
	return t.C(), t.Stop
}
Copy the code

4.6 syncWith

Synchronizes the result of the resource object from the Apiserver list into the DeltaFIFO queue, and calls the queue’s Replace method implementation.

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
	found := make([]interface{}, 0, len(items))
	for _, item := range items {
		found = append(found, item)
	}
	return r.store.Replace(found, resourceVersion)
}
Copy the code

4.7 watchHandler

Watch processing: The interface that receives the watch is taken as the parameter. The external methods of the watch interface are Stop and Resultchan. The former closes the result channel, while the latter obtains the channel.

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way // we're coming back in with the same watch interface. defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if ! ok { break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } if r.expectedType ! = nil { if e, a := r.expectedType, reflect.TypeOf(event.Object); e ! = a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, A)) continue}} // Determine whether the expected type is consistent with the event type monitored. = nil { if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e ! = a { utilruntime.HandleError(fmt.Errorf("%s: Expected GVK %v, but watch event object had GVK %v", r.name, e, a) continue}} err := meta.Accessor(event.Object) if err ! = nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, Event)) continue} newResourceVersion := meta.getresourceversion () Switch event.Type {case watch.Added: err := R.stro. Add(event.object) if err! = nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err ! = nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err ! = nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // A 'Bookmark' means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) if rvu, ok := r.store.(ResourceVersionUpdater); ok { rvu.UpdateResourceVersion(newResourceVersion) } eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil }Copy the code

4.8 relistResourceVersion

The relistResourceVersion function obtains the resource version of relist. If the resource version is not 0, it continues to obtain the relist based on the resource version. If the transmission is interrupted due to a network fault or other reasons, the system continues to transfer the incomplete part based on the resource version when the connection is reconnected next time. To make the data in the local cache consistent with the data in the Etcd cluster, this function is implemented as follows:

// If the last relist result was an HTTP 410 (Gone) status code, return "" so that relist will read the latest version of the resource available in etCD via quorum. // return using lastSyncResourceVersion, Func (r *Reflector) relistResourceVersion() string {func (r *Reflector) relistResourceVersion() r.lastSyncResourceVersionMutex.RLock() defer r.lastSyncResourceVersionMutex.RUnlock() if R.i sLastSyncResourceVersionUnavailable {/ / because reflector paging request List, if lastSyncResourceVersion is out of date, All paged List requests will skip the watch cache // so set ResourceVersion="" and then List again to re-create the reflector to the latest available ResourceVersion read from etcd for consistency. Return ""} if r.l astSyncResourceVersion = =" "{/ / reflector perform the initial List of operations using version 0 as the resources. return "0" } return r.lastSyncResourceVersion }Copy the code

V. Overall process

/ / k8s. IO/client - go/informers/apps/v1 / deployment. Go / / NewFilteredDeploymentInformer for deployment to construct a new Informer. // Always prefer to use an Informer factory to get a shared informer instead of a separate informer to reduce memory footprint and server connections. func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions ! = nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions ! = nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options) }, }, &appsv1.Deployment{}, resyncPeriod, indexers, ) }Copy the code

We can see from the above code when we went to call a resource object Informer (), she will go to invoke the above initialized NewFilteredDeploymentInformer function, The cache.ListWatch object is passed in for initialization, which contains the List and Watch implementation operations. That is, the ListWatcher operation called by the previous reflector in ListAndWatch is implemented in the Informer of a concrete resource object. For example, we get the Deployment resource list data through the ClientSet client interacting with APIServer. This is implemented via client.appSv1 ().Deployments(namespace).list (context.todo (), options) in ListFunc.

  1. After obtaining the full List data, passlistMetaInterface.GetResourceVersion()To obtain the version number of the resource, ResourceVersion is very important, all resources in Kubernetes have this field, it identifies the version number of the current resource object, each time modify (CUD) the current resource object, Kubernetes API Server changes ResourceVersion so that when client-go executes Watch operation, ResourceVersion can be used to determine whether the current resource object has changed.
  2. The meta.ExtractList function is then used to convert the resource data into a list of resource objectsruntime.ObjectObject converted to[]runtime.ObjectObject, because the full fetch is a list of resources.
  3. And then it goes through the reflectorsyncWithThe function stores the resource objects and the resource version numbers in the resource object list in Store, as described in a later section.
  4. Pass after the final processing is completedr.setLastSyncResourceVersion(resourceVersion)Action to set the latest version number of the resource, or to launch a Goroutine periodically to check if a Resync operation is required to call the storager.store.Resync()To execute.
  5. The next operation is the Watch operation. The Watch operation establishes a long connection with APIServer through HTTP protocol and receives resource change events from Kubernetes APIServer. The actual implementation of Watch is also passed in when the specific Informer is initialized, such as WatchFunc in the Deployment Informer above, The underlying Watch operation is also performed on Deployment through the ClientSet clientclient.AppsV1().Deployments(namespace).Watch(context.TODO(), options)The implementation.
  6. After obtaining the watch resource data, callr.watchHandlerWhen the Add, Update, and Delete events are triggered, the corresponding resource object is updated to the local cache (DeltaFIFO) and the version number of ResourceVersion is updated.

This is the core ListAndWatch implementation of Reflector. From the above implementation, we can see that the data obtained eventually flows to the local Store, namely DeltaFIFO, so next we need to analyze the implementation of DeltaFIFO.

Six tests

package main import ( "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "path/filepath" "time" ) func Must(e interface{}) { if e ! = nil { panic(e) } } func InitClientSet() (*kubernetes.Clientset, error) { kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config") restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err ! = nil { return nil, Err} return kubernetes.newforConfig (restConfig)} // Generate listwatcher func InitListerWatcher(clientSet) *kubernetes.Clientset, resource, namespace string, fieldSelector fields.Selector) cache.ListerWatcher { restClient := clientSet.CoreV1().RESTClient() return cache.NewListWatchFromClient(restClient, resource, namespace, // Generate Pods reflector func InitPodsReflector(clientSet * kubernetes.clientset, store cache.Store) *cache.Reflector { resource := "pods" namespace := "default" resyncPeriod := 0 * time.Second expectedType := &corev1.Pod{} lw := InitListerWatcher(clientSet, resource, namespace, fields.Everything()) return cache.NewReflector(lw, expectedType, store, ResyncPeriod)} DeltaFIFO func InitDeltaQueue(store cache.store) cache.Queue {return Cache. NewDeltaFIFOWithOptions (cache. DeltaFIFOOptions {/ / store implements KeyListerGetter KnownObjects: Store, // EmitDeltaTypeReplaced indicates that the queue consumers understand the value of the replacement DeltaType. // Calls to Replace() are handled the same as Sync() until the 'press' event type is added. // For backward compatibility purposes, false by default. // When true, a "Replace" event is sent for the item passed to the Replace() call. When false, the 'Sync' event is sent. EmitDeltaTypeReplaced: true, }) } func InitStore() cache.Store { return cache.NewStore(cache.MetaNamespaceKeyFunc) } func main() { clientSet, Err := InitClientSet() Must(err) // Used to fetch store in processFunc := InitStore() // queue DeleteFIFOQueue := PodReflector := InitPodsReflector(clientSet, DeleteFIFOQueue := make(chan struct{}) defer close(stopCh) go podReflector.Run(stopCh) // start ProcessFunc := Func (obj interface{}) error {// The first received event is processed first for _, d := range obj.(cache.deltas) {switch d.type {case cache.sync, cache.Replaced, cache.Added, cache.Updated: if _, exists, err := store.Get(d.Object); err == nil && exists { if err := store.Update(d.Object); err ! = nil { return err } } else { if err := store.Add(d.Object); err ! = nil { return err } } case cache.Deleted: if err := store.Delete(d.Object); err ! = nil { return err } } pods, ok := d.Object.(*corev1.Pod) if ! ok { return fmt.Errorf("not config: %T", d.Object) } fmt.Printf("Type:%s: Name:%s\n", d.Type, pods.Name) } return nil } fmt.Println("Start syncing..." ) wait.Until(func() { for { _, err := DeleteFIFOQueue.Pop(ProcessFunc) Must(err) } }, time.Second, stopCh) }Copy the code

Seven summarizes

Through this article, you can understand the process of Reflector using ListWatcher to get objects from Kubernetes API, and store them in store. In the future, DeltaFIFO will be read source code, and informer will be combined to deepen the understanding of the whole Informer.

Refer to the link

  • Blog.csdn.net/u013276277/…
  • www.jianshu.com/p/1daeae7b6…

Click to follow, the first time to learn about Huawei cloud fresh technology ~