The author
Wang Cheng, Tencent Cloud R&D engineer, Kubernetes Contributor, engaged in database product container, resource control and other work, pay attention to Kubernetes, Go, cloud native field.
An overview of the
Enter the K8s world and you’ll find a number of controllers that are tuned to fulfill certain types of resources (e.g., POD is managed through DeploymentController, ReplicaSetController) with the goal of maintaining the desired state of the user.
There are dozens of types of resources in K8s, and how to enable internal and external users of K8s to conveniently and efficiently obtain the changes of certain types of resources is what Informer aims to achieve in this paper. This article from the Reflector, DeletaFIFO, Indexer, Controller, SharedInformer, processorListener, Workqueu E (event processing work queue) and so on.
This article and subsequent articles are based on K8s V1.22
(K8s – informer)
From the Reflector
Reflector’s main job is to pull and continuously listen for Add/Update/Delete events from apiserver, stored in a local Store implemented by DeltaFIFO.
First take a look at the Reflector structure definition:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {
// Name uniquely identified by file:line
name string
// The following three are used to confirm the type
expectedTypeName string
expectedType reflect.Type
expectedGVK *schema.GroupVersionKind
// Storage interface: specific by DeltaFIFO implementation of storage
store Store
// Use to pull full and incremental resources from apiserver
listerWatcher ListerWatcher
// The following two are used for failure retries
backoffManager wait.BackoffManager
initConnBackoffManager wait.BackoffManager
// Resynchronization interval for the informer user
resyncPeriod time.Duration
// Determine whether the conditions for resynchronization are met
ShouldResync func() bool
clock clock.Clock
// Whether to page List
paginatedResult bool
// The version number of the last synchronized resource, based on which, watch will only listen for resources greater than this value
lastSyncResourceVersion string
// Whether the last synchronized resource version number is available
isLastSyncResourceVersionUnavailable bool
// add a lock control version number
lastSyncResourceVersionMutex sync.RWMutex
// Size per page
WatchListPageSize int64
// Watch failed to call back handler
watchErrorHandler WatchErrorHandler
}
Copy the code
As you can see from the structure definition, ListAndWatch is performed by specifying the target resource type and paging related Settings can be made.
After the first full resource (target resource type) is pulled, the syncWith function fully replaces (Replace) to the DeltaFIFO Queue /items, and then continuously listens for the Watch increment event. And de-update to the DeltaFIFO Queue /items waiting to be consumed.
Watch target type is implemented by Go Reflect as follows:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
ifr.expectedType ! = nil {if e, a:= r.expectedType, reflect.TypeOf(event.Object); e ! = a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue}}ifr.expectedGVK ! = nil {if e, a:= *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e ! = a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue}}... }Copy the code
Reflection is used to identify the target resource type, so naming it Reflector is more appropriate; List/Watch the goal of resource type in NewSharedIndexInformer. ListerWatcher for sure, but will Watch again in watchHandler compare target type;
Know DeltaFIFO
Let’s first look at the DeltaFIFO structure definition:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// Read and write locks and condition variables
lock sync.RWMutex
cond sync.Cond
// KV storage: objKey1->Deltas[obj1-added, obj1-updated...
items map[string]Deltas
// Store only all objKeys
queue []string
// Already populated: Queue the first objects through the Replace() interface, or flag true on the first call to the add, delete, change interface
populated bool
// The number of first objects to queue through the Replace() interface
initialPopulationCount int
// keyFunc is used to get the corresponding objKey from an obj
keyFunc KeyFunc
// We know the object as Indexer
knownObjects KeyListerGetter
// Whether the queue is closed
closed bool
// Send as a replacement type (to be compatible with older versions of Sync)
emitDeltaTypeReplaced bool
}
Copy the code
DeltaType can be divided into the following types:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType string
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Replaced DeltaType = "Replaced" // First or resynchronize
Sync DeltaType = "Sync" // The old version is called Sync
)
Copy the code
DeltaFIFO is responsible for queueActionLocked, dedupDeltas, and local stores implemented by DeltaFIFO. Includes queue(objKeys only) and items(objKeys and corresponding Deltas delta changes), and is continuously consumed through Pop, and the related logic is handled through Process(item).
(K8s – DeltaFIFO)
The index Indexer
The resource from the previous ListAndWatch step is stored in the DeltaFIFO and then consumed from the queue by calling Pop. In practical use, handling the Process function by sharedIndexInformer. HandleDeltas implement. The HandleDeltas function performs Add/Update/Delete for each of the above DeltaType, and creates, updates, and deletes corresponding indexes at the same time.
Specific index implementation is as follows:
// staging/src/k8s.io/client-go/tools/cache/index.go
// map index type => index function
type Indexers map[string]IndexFunc
// map Index type => Index value Map
type Indices map[string]Index
// Map: indexedValue => [objKey1, objKey2...]
type Index map[string]sets.String
Copy the code
IndexFunc: A function that evaluates an index, which allows the extension of many different index functions. The default and most commonly used index function is MetaNamespaceIndexFunc.
IndexedValue: Sometimes called an indexKey, it represents an index value (such as NS1) computed by an IndexFunc.
Object key: The unique key(such as NS1 / POd1) of the object OBj that corresponds to a resource object.
(K8s – indexer)
As you can see, Indexer is integrated by the ThreadSafeStore interface and eventually implemented by threadSafeMap.
IndexFunc(MetaNamespaceIndexFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc, MetaNamespaceKeyFunc) IndexKey (ns1) is an indexKey calculated by IndexFunc (ns1). ObjKey (ns1/pod1) is a unique key of obj.
Main home Controller
As the core hub, Controller integrates the above components Reflector, DeltaFIFO, Indexer and Store, and becomes a bridge connecting downstream consumers.
Controller is implemented by the Controller structure:
In K8s, the interface defined in upper case is implemented by the corresponding structure defined in lower case.
// staging/src/k8s.io/client-go/tools/cache/controller.go
type controller struct {
config Config
reflector *Reflector // Components parsed above
reflectorMutex sync.RWMutex
clock clock.Clock
}
type Config struct {
// Actually implemented by DeltaFIFO
Queue
// Required to construct Reflector
ListerWatcher
// Pop out the obj handler function
Process ProcessFunc
// Target object type
ObjectType runtime.Object
// Full resynchronization interval
FullResyncPeriod time.Duration
// Whether to resynchronize
ShouldResync ShouldResyncFunc
// If true, Process() returns err and re-queue
RetryOnError bool
// Watch returns the err callback
WatchErrorHandler WatchErrorHandler
// Watch page size
WatchListPageSize int64
}
Copy the code
Starting the Run method as a Goroutine coroutine in the Controller starts Reflector’s ListAndWatch(), which is used to pull full and listen for incremental resources from apiserver and store them in the DeltaFIFO. Next, the processLoop is started to continuously consume from the DeltaFIFO Pop. In sharedIndexInformer, the popup function is HandleDeltas, which maintains Add/Update/Delete for Indexer. On the other hand, the downstream sharedProcessor is called for handler processing.
Start the SharedInformer
The SharedInformer interface is integrated by SharedIndexInformer and implemented by SharedIndexInformer.
Take a look at the structure definition:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.AddIndexers(indexers Indexers) error GetIndexer() Indexer } type sharedIndexInformer struct { indexer Indexer controller Controller// The handler function will be the focus
processor *sharedProcessor
// To check whether the cache has changed, one is used for debugging, default is off
cacheMutationDetector MutationDetector
// Required to construct Reflector
listerWatcher ListerWatcher
// The target type is given to Reflector to determine the resource type
objectType runtime.Object
// Reflector starts the resynchronization period
resyncCheckPeriod time.Duration
// If the consumer has not added a Resync time, the default Resync cycle is used
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
// Two bool indicates three states: controller before start, controller started, controller stopped
started, stopped bool
startedLock sync.Mutex
// When Pop is consuming the queue, the listener needs to be locked to prevent consumption chaos
blockDeltas sync.Mutex
// Watch returns the err callback
watchErrorHandler WatchErrorHandler
}
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener // Listeners need to be synchronized
clock clock.Clock
wg wait.Group
}
Copy the code
As you can see from the structure definition, Reflector ListAndWatch is performed through the integrated Controller (analyzed above), stored into the DeltaFIFO, and the Pop consumption queue is started. The function that pops out for processing in share IndexinFormer is HandleDeltas.
All the listeners through sharedIndexInformer. AddEventHandler joining processorListener array slices, and by judging whether the current controller has started to do different process is as follows:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration){...// If the listener has not been started, add the listener directly to return
if! s.started { s.processor.addListener(listener)return
}
// lock control
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
// Iterate over all objects and send to the listener just added
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
Copy the code
And then, in HandleDeltas, Depending on the type of Delta obj (Added/Updated/does/Replaced/Sync) call sharedProcessor. Distribute to all listen to listeners.
Registered SharedInformerFactory
SharedInformerFactory, as a factory class using SharedInformer, provides a highly cohesive and low-coupling factory class design pattern. Its structure is defined as follows:
// staging/src/k8s.io/client-go/informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory // Focus on the internal interface
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Admissionregistration() admissionregistration.Interface
Internal() apiserverinternal.Interface
Apps() apps.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Storage() storage.Interface
}
// staging/src/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go
type SharedInformerFactory interface {
Start(stopCh <-chan struct{}) / / start SharedIndexInformer. Run
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer // Target type is initialized
}
Copy the code
Take PodInformer as an example to illustrate how users build their own Informer. PodInformer is defined as follows:
// staging/src/k8s.io/client-go/informers/core/v1/pod.goType PodInformer interface {Informer () cache. SharedIndexInformer Lister) (v1) PodLister} by lowercase PodInformer implementation (see again, Uppercase interface lowercase implementation K8s style) : type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) Lister() v1.PodLister {
return v1.NewPodLister(f.Informer().GetIndexer())
}
Copy the code
By the user introduced to the target type (& corev1. Pod {}), the constructor (defaultInformer), call SharedInformerFactory. InformerFor goal Informer registration, Then call SharedInformerFactory. Start to Run, on the above analysis SharedIndexedInformer – > Controller – > Reflector – > DeltaFIFO process.
The high cohesion and low coupling design pattern of SharedInformerFactory is realized by the user passing in the target type and constructor for Informer registration.
The callback processorListener
All listerners are implemented by processorListener, divided into two groups: Listeners, syncingListeners, run through all the listeners of their groups and send data to processorListener for processing.
Note That the resyncPeriod Settings of the listeners may be different, the listeners are classified as listeners and those with resyncPeriod are classified as syncingListeners. If a listener in multiple places (sharedIndexInformer resyncCheckPeriod, SharedIndexInformer AddEventHandlerWithResyncPeriod) are set up resyncPeriod, taking the minimum minimumResyncPeriod;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
Copy the code
As you can see from the code, processorListener cleverly buffers two channels (addCh, nextCh) and a pendingNotifications(scroll Ring implemented by Slice), The default is initialBufferSize = 1024. It not only achieves efficient data transfer, but also does not block upstream and downstream processing, which is worth learning.
(K8s – processorListener)
Workqueue busy
Pass the processorListener callback to the internal ResourceEventHandler for the real add/delete (CUD) processing, calling OnAdd/OnUpdate/OnDelete registration functions respectively.
For quick processing without blocking the processorListener callback, workQueue is typically used for asynchronous uncoupling as follows:
(K8s – workqueue)
Can see from the table, the workqueue. RateLimitingInterface integrates DelayingInterface, DelayingInterface integration Interface, ultimately achieve by rateLimitingType, It provides three core capabilities: rateLimit speed limiting, delay delay queue entry (implemented by priority queue through small top heap), and Queue queue processing.
In addition, you can see in the code that K8s implements three types of RateLimiter: BucketRateLimiter ItemExponentialFailureRateLimiter, ItemFastSlowRateLimiter, Controller default USES the first two is as follows:
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)})}Copy the code
In this way, the user can call workqueue-related methods for flexible queue processing, such as no retry after the number of failures, delay queue entry time control, queue speed limit control (QPS), etc., to achieve non-blocking asynchronous logical processing.
summary
Through the analysis of K8s Reflector, DeletaFIFO, Indexer, Controller, SharedInformer, processorListener, Workqueu E (Event processing work queue) and other components, the implementation mechanism of Informer was analyzed, and the related process processing was illustrated by source code and graphics, in order to better understand the running process of K8s Informer.
As can be seen, in order to achieve efficient and non-blocking core processes, K8s uses a large number of goroutine coroutines, channel channels, queue queues, index indexes, map de-weight, etc. Through the good interface design pattern, it opens up a lot of expansion ability for users. Unified interface and implementation of the naming method, which are worth learning and reference.
PS: More content please pay attention to k8S-Club GitHub address: github.com/k8s-club/k8…
The resources
[1] Kubernetes.io /
[2] Kubernetes source code: [github.com/kubernetes/…
[3] Kubernetes Architectural Roadmap: [github.com/kubernetes/…
About us
More about cloud native cases and knowledge, can pay attention to the same name [Tencent cloud native] public account ~
Benefits: the official account responds to the “Manual” backstage, and you can get “Tencent Cloud native Roadmap manual” & “Best Practices of Tencent Cloud Native” ~