The author

Wang Cheng, Tencent Cloud R&D engineer, Kubernetes Contributor, engaged in database product container, resource control and other work, pay attention to Kubernetes, Go, cloud native field.

An overview of the

Enter the K8s world and you’ll find a number of controllers that are tuned to fulfill certain types of resources (e.g., POD is managed through DeploymentController, ReplicaSetController) with the goal of maintaining the desired state of the user.

There are dozens of types of resources in K8s, and how to enable internal and external users of K8s to conveniently and efficiently obtain the changes of certain types of resources is what Informer aims to achieve in this paper. This article from the Reflector, DeletaFIFO, Indexer, Controller, SharedInformer, processorListener, Workqueu E (event processing work queue) and so on.

This article and subsequent articles are based on K8s V1.22

(K8s – informer)

From the Reflector

Reflector’s main job is to pull and continuously listen for Add/Update/Delete events from apiserver, stored in a local Store implemented by DeltaFIFO.

First take a look at the Reflector structure definition:

// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
	// Name uniquely identified by file:line
	name string

	// The following three are used to confirm the type
	expectedTypeName string
	expectedType     reflect.Type
	expectedGVK      *schema.GroupVersionKind

	// Storage interface: specific by DeltaFIFO implementation of storage
	store Store
	// Use to pull full and incremental resources from apiserver
	listerWatcher ListerWatcher

	// The following two are used for failure retries
	backoffManager         wait.BackoffManager
	initConnBackoffManager wait.BackoffManager

	// Resynchronization interval for the informer user
	resyncPeriod time.Duration
	// Determine whether the conditions for resynchronization are met
	ShouldResync func() bool
	
	clock clock.Clock
	
	// Whether to page List
	paginatedResult bool
	
	// The version number of the last synchronized resource, based on which, watch will only listen for resources greater than this value
	lastSyncResourceVersion string
	// Whether the last synchronized resource version number is available
	isLastSyncResourceVersionUnavailable bool
	// add a lock control version number
	lastSyncResourceVersionMutex sync.RWMutex
	
	// Size per page
	WatchListPageSize int64
	// Watch failed to call back handler
	watchErrorHandler WatchErrorHandler
}
Copy the code

As you can see from the structure definition, ListAndWatch is performed by specifying the target resource type and paging related Settings can be made.

After the first full resource (target resource type) is pulled, the syncWith function fully replaces (Replace) to the DeltaFIFO Queue /items, and then continuously listens for the Watch increment event. And de-update to the DeltaFIFO Queue /items waiting to be consumed.

Watch target type is implemented by Go Reflect as follows:

// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {

	...
	ifr.expectedType ! = nil {if e, a:= r.expectedType, reflect.TypeOf(event.Object); e ! = a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
			continue}}ifr.expectedGVK ! = nil {if e, a:= *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e ! = a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
			continue}}... }Copy the code

Reflection is used to identify the target resource type, so naming it Reflector is more appropriate; List/Watch the goal of resource type in NewSharedIndexInformer. ListerWatcher for sure, but will Watch again in watchHandler compare target type;

Know DeltaFIFO

Let’s first look at the DeltaFIFO structure definition:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
	// Read and write locks and condition variables
	lock sync.RWMutex
	cond sync.Cond

	// KV storage: objKey1->Deltas[obj1-added, obj1-updated...
	items map[string]Deltas

	// Store only all objKeys
	queue []string

	// Already populated: Queue the first objects through the Replace() interface, or flag true on the first call to the add, delete, change interface
	populated bool
	// The number of first objects to queue through the Replace() interface
	initialPopulationCount int

	// keyFunc is used to get the corresponding objKey from an obj
	keyFunc KeyFunc

	// We know the object as Indexer
	knownObjects KeyListerGetter

	// Whether the queue is closed
	closed bool

	// Send as a replacement type (to be compatible with older versions of Sync)
	emitDeltaTypeReplaced bool
}
Copy the code

DeltaType can be divided into the following types:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced" // First or resynchronize
	Sync DeltaType = "Sync" // The old version is called Sync
)
Copy the code

DeltaFIFO is responsible for queueActionLocked, dedupDeltas, and local stores implemented by DeltaFIFO. Includes queue(objKeys only) and items(objKeys and corresponding Deltas delta changes), and is continuously consumed through Pop, and the related logic is handled through Process(item).

(K8s – DeltaFIFO)

The index Indexer

The resource from the previous ListAndWatch step is stored in the DeltaFIFO and then consumed from the queue by calling Pop. In practical use, handling the Process function by sharedIndexInformer. HandleDeltas implement. The HandleDeltas function performs Add/Update/Delete for each of the above DeltaType, and creates, updates, and deletes corresponding indexes at the same time.

Specific index implementation is as follows:

// staging/src/k8s.io/client-go/tools/cache/index.go
// map index type => index function
type Indexers map[string]IndexFunc

// map Index type => Index value Map
type Indices map[string]Index

// Map: indexedValue => [objKey1, objKey2...]
type Index map[string]sets.String
Copy the code

IndexFunc: A function that evaluates an index, which allows the extension of many different index functions. The default and most commonly used index function is MetaNamespaceIndexFunc.

IndexedValue: Sometimes called an indexKey, it represents an index value (such as NS1) computed by an IndexFunc.

Object key: The unique key(such as NS1 / POd1) of the object OBj that corresponds to a resource object.

(K8s – indexer)

As you can see, Indexer is integrated by the ThreadSafeStore interface and eventually implemented by threadSafeMap.

IndexFunc(MetaNamespaceIndexFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc) IndexKey (ns1) is an indexKey calculated by IndexFunc (ns1). ObjKey (ns1/pod1) is a unique key of obj.

Main home Controller

As the core hub, Controller integrates the above components Reflector, DeltaFIFO, Indexer and Store, and becomes a bridge connecting downstream consumers.

Controller is implemented by the Controller structure:

In K8s, the interface defined in upper case is implemented by the corresponding structure defined in lower case.

// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct {
	config         Config
	reflector      *Reflector // Components parsed above
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

type Config struct {
	// Actually implemented by DeltaFIFO
	Queue

	// Required to construct Reflector
	ListerWatcher

	// Pop out the obj handler function
	Process ProcessFunc

	// Target object type
	ObjectType runtime.Object

	// Full resynchronization interval
	FullResyncPeriod time.Duration

	// Whether to resynchronize
	ShouldResync ShouldResyncFunc

	// If true, Process() returns err and re-queue
	RetryOnError bool

	// Watch returns the err callback
	WatchErrorHandler WatchErrorHandler

	// Watch page size
	WatchListPageSize int64
}
Copy the code

Starting the Run method as a Goroutine coroutine in the Controller starts Reflector’s ListAndWatch(), which is used to pull full and listen for incremental resources from apiserver and store them in the DeltaFIFO. Next, the processLoop is started to continuously consume from the DeltaFIFO Pop. In sharedIndexInformer, the popup function is HandleDeltas, which maintains Add/Update/Delete for Indexer. On the other hand, the downstream sharedProcessor is called for handler processing.

Start the SharedInformer

The SharedInformer interface is integrated by SharedIndexInformer and implemented by SharedIndexInformer.

Take a look at the structure definition:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.AddIndexers(indexers Indexers) error GetIndexer() Indexer } type sharedIndexInformer struct { indexer Indexer controller  Controller// The handler function will be the focus
	processor *sharedProcessor

	// To check whether the cache has changed, one is used for debugging, default is off
	cacheMutationDetector MutationDetector

	// Required to construct Reflector
	listerWatcher ListerWatcher

	// The target type is given to Reflector to determine the resource type
	objectType runtime.Object

	// Reflector starts the resynchronization period
	resyncCheckPeriod time.Duration

	// If the consumer has not added a Resync time, the default Resync cycle is used
	defaultEventHandlerResyncPeriod time.Duration
	clock                           clock.Clock

	// Two bool indicates three states: controller before start, controller started, controller stopped
	started, stopped bool
	startedLock      sync.Mutex

	// When Pop is consuming the queue, the listener needs to be locked to prevent consumption chaos
	blockDeltas sync.Mutex

	// Watch returns the err callback
	watchErrorHandler WatchErrorHandler
}

type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener // Listeners need to be synchronized
	clock            clock.Clock
	wg               wait.Group
}
Copy the code

As you can see from the structure definition, Reflector ListAndWatch is performed through the integrated Controller (analyzed above), stored into the DeltaFIFO, and the Pop consumption queue is started. The function that pops out for processing in share IndexinFormer is HandleDeltas.

All the listeners through sharedIndexInformer. AddEventHandler joining processorListener array slices, and by judging whether the current controller has started to do different process is as follows:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration){...// If the listener has not been started, add the listener directly to return
	if! s.started { s.processor.addListener(listener)return
	}

	// lock control
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	s.processor.addListener(listener)
	
	// Iterate over all objects and send to the listener just added
	for _, item := range s.indexer.List() {
		listener.add(addNotification{newObj: item})
	}
}
Copy the code

And then, in HandleDeltas, Depending on the type of Delta obj (Added/Updated/does/Replaced/Sync) call sharedProcessor. Distribute to all listen to listeners.

Registered SharedInformerFactory

SharedInformerFactory, as a factory class using SharedInformer, provides a highly cohesive and low-coupling factory class design pattern. Its structure is defined as follows:

// staging/src/k8s.io/client-go/informers/factory.go
type SharedInformerFactory interface {
	internalinterfaces.SharedInformerFactory // Focus on the internal interface
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

	Admissionregistration() admissionregistration.Interface
	Internal() apiserverinternal.Interface
	Apps() apps.Interface
	Autoscaling() autoscaling.Interface
	Batch() batch.Interface
	Certificates() certificates.Interface
	Coordination() coordination.Interface
	Core() core.Interface
	Discovery() discovery.Interface
	Events() events.Interface
	Extensions() extensions.Interface
	Flowcontrol() flowcontrol.Interface
	Networking() networking.Interface
	Node() node.Interface
	Policy() policy.Interface
	Rbac() rbac.Interface
	Scheduling() scheduling.Interface
	Storage() storage.Interface
}

// staging/src/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
	Start(stopCh <-chan struct{}) / / start SharedIndexInformer. Run
	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer // Target type is initialized
}
Copy the code

Take PodInformer as an example to illustrate how users build their own Informer. PodInformer is defined as follows:

// staging/src/k8s.io/client-go/informers/core/v1/pod.goType PodInformer interface {Informer () cache. SharedIndexInformer Lister) (v1) PodLister} by lowercase PodInformer implementation (see again, Uppercase interface lowercase implementation K8s style) :  type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *podInformer) Lister() v1.PodLister {
	return v1.NewPodLister(f.Informer().GetIndexer())
}
Copy the code

By the user introduced to the target type (& corev1. Pod {}), the constructor (defaultInformer), call SharedInformerFactory. InformerFor goal Informer registration, Then call SharedInformerFactory. Start to Run, on the above analysis SharedIndexedInformer – > Controller – > Reflector – > DeltaFIFO process.

The high cohesion and low coupling design pattern of SharedInformerFactory is realized by the user passing in the target type and constructor for Informer registration.

The callback processorListener

All listerners are implemented by processorListener, divided into two groups: Listeners, syncingListeners, run through all the listeners of their groups and send data to processorListener for processing.

Note That the resyncPeriod Settings of the listeners may be different, the listeners are classified as listeners and those with resyncPeriod are classified as syncingListeners. If a listener in multiple places (sharedIndexInformer resyncCheckPeriod, SharedIndexInformer AddEventHandlerWithResyncPeriod) are set up resyncPeriod, taking the minimum minimumResyncPeriod;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}
Copy the code

As you can see from the code, processorListener cleverly buffers two channels (addCh, nextCh) and a pendingNotifications(scroll Ring implemented by Slice), The default is initialBufferSize = 1024. It not only achieves efficient data transfer, but also does not block upstream and downstream processing, which is worth learning.

(K8s – processorListener)

Workqueue busy

Pass the processorListener callback to the internal ResourceEventHandler for the real add/delete (CUD) processing, calling OnAdd/OnUpdate/OnDelete registration functions respectively.

For quick processing without blocking the processorListener callback, workQueue is typically used for asynchronous uncoupling as follows:

(K8s – workqueue)

Can see from the table, the workqueue. RateLimitingInterface integrates DelayingInterface, DelayingInterface integration Interface, ultimately achieve by rateLimitingType, It provides three core capabilities: rateLimit speed limiting, delay delay queue entry (implemented by priority queue through small top heap), and Queue queue processing.

In addition, you can see in the code that K8s implements three types of RateLimiter: BucketRateLimiter ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter, Controller default USES the first two is as follows:

// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go
func DefaultControllerRateLimiter() RateLimiter {
	return NewMaxOfRateLimiter(
		NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
		// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
		&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)})}Copy the code

In this way, the user can call workqueue-related methods for flexible queue processing, such as no retry after the number of failures, delay queue entry time control, queue speed limit control (QPS), etc., to achieve non-blocking asynchronous logical processing.

summary

Through the analysis of K8s Reflector, DeletaFIFO, Indexer, Controller, SharedInformer, processorListener, Workqueu E (Event processing work queue) and other components, the implementation mechanism of Informer was analyzed, and the related process processing was illustrated by source code and graphics, in order to better understand the running process of K8s Informer.

As can be seen, in order to achieve efficient and non-blocking core processes, K8s uses a large number of goroutine coroutines, channel channels, queue queues, index indexes, map de-weight, etc. Through the good interface design pattern, it opens up a lot of expansion ability for users. Unified interface and implementation of the naming method, which are worth learning and reference.

PS: More content please pay attention to k8S-Club GitHub address: github.com/k8s-club/k8…

The resources

[1] Kubernetes.io /

[2] Kubernetes source code: [github.com/kubernetes/…

[3] Kubernetes Architectural Roadmap: [github.com/kubernetes/…

About us

More about cloud native cases and knowledge, can pay attention to the same name [Tencent cloud native] public account ~

Benefits: the official account responds to the “Manual” backstage, and you can get “Tencent Cloud native Roadmap manual” & “Best Practices of Tencent Cloud Native” ~