Focus on the target
How did Informer handle resource changes after discovering them
directory
- Look at the process of spending
- Master Index data structures
- Information is distributed
- Integrated thinking by Informer
Process
func (c *controller) processLoop(a) {
for {
// Pop out the Object element
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
iferr ! =nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// re-queue
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
// Go to the implementation of Pop
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// Call process to process the item and return it
item, ok := f.items[id]
delete(f.items, id)
err := process(item)
return item, err
}
}
// Then look up the PopProcessFunc definition before creating the Controller
cfg := &Config{
Process: s.HandleDeltas,
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
for _, d := range obj.(Deltas) {
switch d.Type {
// add, change, replace, synchronize
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
// Go to indexer first
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// If the data already exists, execute the Update logic
iferr := s.indexer.Update(d.Object); err ! =nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// Distribute the Update event
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// Execute Add if no data is found
iferr := s.indexer.Add(d.Object); err ! =nil {
return err
}
// Distribute the Add event
s.processor.distribute(addNotification{newObj: d.Object}, false)}/ / delete
case Deleted:
// delete indexer
iferr := s.indexer.Delete(d.Object); err ! =nil {
return err
}
// Distribute the DELETE event
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
Copy the code
Index
Index is defined as the local storage of the resource, which is consistent with the resource information in etCD.
// How to create Index
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
// Initialization of indexer
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// Create a map and func Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
ThreadSafeStore is a concurrency safe map
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
items: map[string]interface{}{},
indexers: indexers,
indices: indices,
}
}
Copy the code
distribute
// In the Process code above, we see that after storing data into Indexer, a distribution function is called
s.processor.distribute()
// Create the distribution process
func NewSharedIndexInformer(a) SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
}
return sharedIndexInformer
}
// The sharedProcessor structure
type sharedProcessor struct {
listenersStarted bool
/ / read/write locks
listenersLock sync.RWMutex
// Common listener list
listeners []*processorListener
// Synchronize the listener list
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
Distribute function
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// Distribute object to a list of synchronous listeners or normal listeners
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// The add operation uses a channel
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
Copy the code
Summary
Informer
Depends on theReflector
Module, which has a component for xxxInformer, such aspodInformer
- resource-specific
Informer
Contains a connection tokube-apiserver
theclient
Through theList
andWatch
Interface to query resource changes - If a resource change is detected, pass
Controller
Put the data into the queueDeltaFIFOQueue
In, the production phase is completed - in
DeltaFIFOQueue
On the other side, there are consumers constantly dealing with the events of resource changes, and the processing logic is mainly divided into two steps- The data is saved to the local store Indexer, whose underlying implementation is a concurrency safe threadSafeMap
- Some components need to monitor resource changes in real time and listen to events in real time. Then, they send events to the corresponding registered listeners and process them by themselves
Making: github.com/Junedayday/…
Blog: junes.tech/
Public id: Golangcoding