This document explains how Flagger implements canary publishing; At the same time, I can understand the architecture of client-Go and how CRD data is spread, which is of great help to the subsequent DEVELOPMENT of CRD.

The overall architecture

The overall architecture adopts the client-go mode, while adding the use of two timers & sync.map

Client-go can be found at github.com/JaneLiuL/ku… .

The function of timer is to perform tasks

Data flow diagram:

The source code parsing

main.go

  • Initialize the Controller
    • The add/update/delete callback event for the resource
  • Start the
func main(a){.../ / initialization
  c := controller.NewController(
		kubeClient,
		flaggerClient,
		infos,
		controlLoopInterval,
		logger,
		notifierClient,
		canaryFactory,
		routerFactory,
		observerFactory,
		meshProvider,
		version.VERSION,
		fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
	)

	// leader election context
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// prevents new requests when leadership is lost
	cfg.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))

	// cancel leader election context on shutdown signals
	go func(a) {
		<-stopCh
		cancel()
	}()

	// wrap controller run
	runController := func(a) {
    / / start
		iferr := c.Run(threadiness, stopCh); err ! =nil {
			logger.Fatalf("Error running controller: %v", err)
		}
	}
  ...
}
Copy the code

controller.go

NewController method
// fluxcd/flagger/pkg/controller/controller.go # NewController
func NewController(
	kubeClient kubernetes.Interface,
	flaggerClient clientset.Interface,
	flaggerInformers Informers,
	flaggerWindow time.Duration,
	logger *zap.SugaredLogger,
	notifier notifier.Interface,
	canaryFactory *canary.Factory,
	routerFactory *router.Factory,
	observerFactory *observers.Factory,
	meshProvider string,
	version string,
	eventWebhook string.) *Controller{...// Add a callback method for the canary resource
  flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    // When new resources are added by apply, they are queued. Later startup methods will fetch data from this queue for logical processing -------- first
		AddFunc: ctrl.enqueue,
    // When there are resources to update
		UpdateFunc: func(old, new interface{}){... },// Delete the resource
		DeleteFunc: func(old interface{}) {
			r, ok := checkCustomResourceType(old, logger)
			if ok {
				ctrl.logger.Infof("Deleting %s.%s from cache", r.Name, r.Namespace)
        // The resource will be removed from sync.map when it is deleted. If the resource is not removed, it will be stored in the Map
				ctrl.canaries.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace))
			}
		},
	})

}

Copy the code
Run method
// // fluxcd/flagger/pkg/controller/controller.go # Run
// Run starts the K8s workers and the canary scheduler
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	defer utilruntime.HandleCrash()
	defer c.workqueue.ShutDown()

	c.logger.Info("Starting operator")

	for i := 0; i < threadiness; i++ {
		go wait.Until(func(a) {
      // This method is important because it will fetch the resource from the queue stored when the resource is created. In the first section, it is stated when the data in the queue is written
			for c.processNextWorkItem() {
			}
		}, time.Second, stopCh)
	}

	c.logger.Info("Started operator workers")

  // Flagger's own client-go implementation adds its own logic here, which uses a timer to handle canary resources stored in sync.map
	tickChan := time.NewTicker(c.flaggerWindow).C
	for {
		select {
		case <-tickChan:
      // Important, here we create canaryJob, the timer that processes the canary resource ------ at 2
			c.scheduleCanaries()
		case <-stopCh:
			c.logger.Info("Shutting down operator workers")
			return nil}}}Copy the code
processNextWorkItem
// fluxcd/flagger/pkg/controller/controller.go # processNextWorkItem
func (c *Controller) processNextWorkItem(a) bool {
  // Get messages from the queue
	obj, shutdown := c.workqueue.Get()

	...
	
    // This method is important because it stores the objects removed from the queue into sync.map
		iferr := c.syncHandler(key); err ! =nil {
			return fmt.Errorf("error syncing '%s': %w", key, err)
		}
		// Finally, if no error occurs we Forget this item so it does not
		// get queued again until another change happens.
		c.workqueue.Forget(obj)
		return nil
	}(obj)

...
}
Copy the code
syncHandler
  • Initialization of the Canary CR state
  • Canary is stored in sync.map
// fluxcd/flagger/pkg/controller/controller.go # syncHandler
func (c *Controller) syncHandler(key string) error{...// set status condition for new canaries
	if cd.Status.Conditions == nil {
		// Initialize the state in canary cr
		iferr := c.setPhaseInitializing(cd); err ! =nil {
			c.logger.Errorf("%s unable to set initializing status: %v", key, err)
			return fmt.Errorf("%s initializing error: %w", key, err)
		}
	}

	// Store the canary information
	c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)

	// If opt in for revertOnDeletion add finalizer if not present
	ifcd.Spec.RevertOnDeletion && ! hasFinalizer(cd) {iferr := c.addFinalizer(cd); err ! =nil {
			return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
		}

	}
	c.logger.Infof("Synced %s", key)
...
	return nil
}
Copy the code

scheduler.go

fluxcd/flagger/pkg/controller/scheduler.go

scheduleCanaries

Facilitate sync.map and create scheduled tasks

// fluxcd/flagger/pkg/controller/scheduler.go # scheduleCanaries
func (c *Controller) scheduleCanaries(a) {
	current := make(map[string]string)
	stats := make(map[string]int)
	c.logger.Infof("----95 canaries----")

  / / traverse sync. The Map
	c.canaries.Range(func(key interface{}, value interface{}) bool {
		cn := value.(*flaggerv1.Canary)

		// format: <name>.<namespace>
		name := key.(string)

		current[name] = fmt.Sprintf("%s.%s", cn.Spec.TargetRef.Name, cn.Namespace)

		job, exists := c.jobs[name]
		/ / create canaryJob,
		if(exists && job.GetCanaryAnalysisInterval() ! = cn.GetAnalysisInterval()) || ! exists {if exists {
				job.Stop()
			}

			newJob := CanaryJob{
				Name:             cn.Name,
				Namespace:        cn.Namespace,
        / / this method is very important, this method is a callback canaryJob method is the core business logic, / / fluxcd flagger/PKG/controller/scheduler. Go # advanceCanary
				function:         c.advanceCanary,
				done:             make(chan bool),
				ticker:           time.NewTicker(cn.GetAnalysisInterval()),
				analysisInterval: cn.GetAnalysisInterval(),
			}

			c.jobs[name] = newJob
      // The logic in start is a timer
			newJob.Start()
		}

		...
}
Copy the code
AdvanceCanary method
  • Different logical processing is performed according to canay’s different states
  • Find the specific implementation according to the provider and target.Kind defined in the Canary resource. If you want to add your own implementation, you can directly implement the corresponding interface
  • Create additional services; Canary creates MAIN SVC (*-primary, *-canary) if MAIN SVC is not created, canary creates MAIN SVC (*-primary, *-canary
  • Create *-canary ingress. If ingressRef is not defined in canary, the canary publication will fail because canary relies heavily on ingress.
  • This method contains implementation logic for the specific A/B canary

Flow charts are on their way…

The Provider implements the following interfaces

type Interface interface {
	Reconcile(canary *flaggerv1.Canary) error
	SetRoutes(canary *flaggerv1.Canary, primaryWeight int, canaryWeight int, mirrored bool) error
	GetRoutes(canary *flaggerv1.Canary) (primaryWeight int, canaryWeight int, mirrored bool, err error)
	Finalize(canary *flaggerv1.Canary) error
}
Copy the code

Controller implements the following interfaces

type Controller interface {
	IsPrimaryReady(canary *flaggerv1.Canary) error
	IsCanaryReady(canary *flaggerv1.Canary) (bool, error)
	GetMetadata(canary *flaggerv1.Canary) (string.string.map[string]int32, error)
	SyncStatus(canary *flaggerv1.Canary, status flaggerv1.CanaryStatus) error
	SetStatusFailedChecks(canary *flaggerv1.Canary, val int) error
	SetStatusWeight(canary *flaggerv1.Canary, val int) error
	SetStatusIterations(canary *flaggerv1.Canary, val int) error
	SetStatusPhase(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error
	Initialize(canary *flaggerv1.Canary) error
	Promote(canary *flaggerv1.Canary) error
	HasTargetChanged(canary *flaggerv1.Canary) (bool, error)
	HaveDependenciesChanged(canary *flaggerv1.Canary) (bool. error) ScaleToZero(canary *flaggerv1.Canary) error ScaleFromZero(canary *flaggerv1.Canary) error Finalize(canary *flaggerv1.Canary) error }Copy the code

Canary Process combing

Flagger uses the canary deployment example to help us understand the canary release. At the end, I will bring the official YAML with me for easy checking

chestnuts

This chestnut contains three YAML files, please apply one by one during verification; Make sure flagger is installed on your K8S cluster before you verify

podinfo.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: podinfo
  namespace: test
  labels:
    app: podinfo
spec:
  replicas: 2
  minReadySeconds: 5
  revisionHistoryLimit: 5
  progressDeadlineSeconds: 60
  strategy:
    rollingUpdate:
      maxUnavailable: 1
    type: RollingUpdate
  selector:
    matchLabels:
      app: podinfo
  template:
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9797"
      labels:
        app: podinfo
    spec:
      containers:
      - name: podinfod
        image: Stefanprodan/podinfo: 3.1.1
        imagePullPolicy: IfNotPresent
        ports:
        - name: http
          containerPort: 9898
          protocol: TCP
        - name: http-metrics
          containerPort: 9797
          protocol: TCP
        - name: grpc
          containerPort: 9999
          protocol: TCP
        command:
        - ./podinfo
        - --port=9898
        - --port-metrics=9797
        - --grpc-port=9999
        - --grpc-service-name=podinfo
        - --level=info
        - --random-delay=false
        - --random-error=false
        env:
        - name: PODINFO_UI_COLOR
          value: "#34577c"
        livenessProbe:
          exec:
            command:
            - podcli
            - check
            - http
            - localhost:9898/healthz
          initialDelaySeconds: 5
          timeoutSeconds: 5
        readinessProbe:
          exec:
            command:
            - podcli
            - check
            - http
            - localhost:9898/readyz
          initialDelaySeconds: 5
          timeoutSeconds: 5
        resources:
          limits:
            cpu: 2000m
            memory: 512Mi
          requests:
            cpu: 100m
            memory: 64Mi

Copy the code
ingress-podinfo.yaml
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  name: podinfo
  namespace: test
  labels:
    app: podinfo
  annotations:
    kubernetes.io/ingress.class: "nginx"
spec:
  rules:
    - host: podinfo.test.jd.com
      http:
        paths:
          - backend:
              serviceName: podinfo
              servicePort: 80

Copy the code
canary.yaml

This document has been cut down on the official basis to facilitate the verification of the main process

apiVersion: flagger.app/v1beta1
kind: Canary
metadata:
  name: podinfo
  namespace: test
spec:
  provider: nginx
  # deployment reference
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: podinfo
  # ingress reference
  ingressRef:
    apiVersion: networking.k8s.io/v1beta1
    kind: Ingress
    name: podinfo
  # HPA reference (optional)
  # autoscalerRef:
  # apiVersion: autoscaling/v2beta2
  # kind: HorizontalPodAutoscaler
  # name: podinfo
  # the maximum time in seconds for the canary deployment
  # to make progress before it is rollback (default 600s)
  progressDeadlineSeconds: 60
  service:
    # ClusterIP port number
    port: 80
    # container port number or name
    targetPort: 9898
  analysis:
    # schedule interval (default 60s)
    interval: 10s
    # max number of failed metric checks before rollback
    threshold: 10
    # max traffic percentage routed to canary
    # percentage (0-100)
    maxWeight: 100
    # canary increment step
    # percentage (0-100)
    stepWeight: 20
    # NGINX Prometheus checks
    #metrics:
    #- name: request-success-rate
    # # minimum req success rate (non 5xx responses)
      # percentage (0-100)
      # thresholdRange:
      # min: 99
      #interval: 1m
    # testing (optional)
    # webhooks:
    # - name: acceptance-test
    # type: pre-rollout
    # url: http://flagger-loadtester.test/
    # timeout: 30s
    # metadata:
    # type: bash
    # cmd: "curl -sd 'test' http://podinfo-canary/token | grep token"
    # - name: load-test
    # url: http://flagger-loadtester.test/
    # timeout: 5s
    # metadata:
    # cmd: "hey -z 1m -q 10 -c 2 http://podinfo.test.jd.com/"

Copy the code

The main logic

  • How do I monitor new version changes to a user’s Deployment

In timing task newJob will invoke this method fluxcd/flagger/PKG/controller/scheduler. Go, this method can judge whether the deployment of the users have updated, If there is an update it will change the status of Canary. Status from the end state to new CanaryPhaseMisc

  • There are two important sets of interfaces

1, supported canary targetRef type, all need to implement this interface fluxcd flagger/PKG/canary/controller. Go

2, canary supported by the provider type, the need to implement this interface fluxcd flagger/PKG/router, the router. Go

  • How is the state initialized after applying canary. Yaml

Fluxcd flagger/PKG/controller/controller. Go # syncHandler method of initialization, at the same time will canary into sync. The Map, so that the timing tasks can be out of sync. The Map to get in

  • How is client-Go represented in this project

Client-go is also used in this project, but the use of this project is similar to tektoncd, which directly realizes the interface and implements a new implementation. In this way, the local benefits are more flexible and more controllable

  • Canary Status CRD two broken LastApplidSpec and LastPromotedSpec

Both values hold the hash value of deployment.spec.template

Including LastApplidSpec assignment in sync, that currently used is deployment. The spec. The template, In judging whether there is updated using fluxcd deployment/flagger/PKG/canary/status. Go # syncCanaryStatus;

LastPromotedSpec copies the state of LastApplidSpec to this value when the final Canary state becomes final (inited/Success)

  • How is the flow ratio achieved

The first is the provider support used

For example, ingress allows traffic to be set based on the way annotations are set

Through the fluxcd/flagger/PKG/router/ingress. Go # SetRoutes can set the value of the annotation of ingress note