This document explains how Flagger implements canary publishing; At the same time, I can understand the architecture of client-Go and how CRD data is spread, which is of great help to the subsequent DEVELOPMENT of CRD.
The overall architecture
The overall architecture adopts the client-go mode, while adding the use of two timers & sync.map
Client-go can be found at github.com/JaneLiuL/ku… .
The function of timer is to perform tasks
Data flow diagram:
The source code parsing
main.go
- Initialize the Controller
- The add/update/delete callback event for the resource
- Start the
func main(a){.../ / initialization
c := controller.NewController(
kubeClient,
flaggerClient,
infos,
controlLoopInterval,
logger,
notifierClient,
canaryFactory,
routerFactory,
observerFactory,
meshProvider,
version.VERSION,
fromEnv("EVENT_WEBHOOK_URL", eventWebhook),
)
// leader election context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// prevents new requests when leadership is lost
cfg.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))
// cancel leader election context on shutdown signals
go func(a) {
<-stopCh
cancel()
}()
// wrap controller run
runController := func(a) {
/ / start
iferr := c.Run(threadiness, stopCh); err ! =nil {
logger.Fatalf("Error running controller: %v", err)
}
}
...
}
Copy the code
controller.go
NewController method
// fluxcd/flagger/pkg/controller/controller.go # NewController
func NewController(
kubeClient kubernetes.Interface,
flaggerClient clientset.Interface,
flaggerInformers Informers,
flaggerWindow time.Duration,
logger *zap.SugaredLogger,
notifier notifier.Interface,
canaryFactory *canary.Factory,
routerFactory *router.Factory,
observerFactory *observers.Factory,
meshProvider string,
version string,
eventWebhook string.) *Controller{...// Add a callback method for the canary resource
flaggerInformers.CanaryInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// When new resources are added by apply, they are queued. Later startup methods will fetch data from this queue for logical processing -------- first
AddFunc: ctrl.enqueue,
// When there are resources to update
UpdateFunc: func(old, new interface{}){... },// Delete the resource
DeleteFunc: func(old interface{}) {
r, ok := checkCustomResourceType(old, logger)
if ok {
ctrl.logger.Infof("Deleting %s.%s from cache", r.Name, r.Namespace)
// The resource will be removed from sync.map when it is deleted. If the resource is not removed, it will be stored in the Map
ctrl.canaries.Delete(fmt.Sprintf("%s.%s", r.Name, r.Namespace))
}
},
})
}
Copy the code
Run method
// // fluxcd/flagger/pkg/controller/controller.go # Run
// Run starts the K8s workers and the canary scheduler
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
c.logger.Info("Starting operator")
for i := 0; i < threadiness; i++ {
go wait.Until(func(a) {
// This method is important because it will fetch the resource from the queue stored when the resource is created. In the first section, it is stated when the data in the queue is written
for c.processNextWorkItem() {
}
}, time.Second, stopCh)
}
c.logger.Info("Started operator workers")
// Flagger's own client-go implementation adds its own logic here, which uses a timer to handle canary resources stored in sync.map
tickChan := time.NewTicker(c.flaggerWindow).C
for {
select {
case <-tickChan:
// Important, here we create canaryJob, the timer that processes the canary resource ------ at 2
c.scheduleCanaries()
case <-stopCh:
c.logger.Info("Shutting down operator workers")
return nil}}}Copy the code
processNextWorkItem
// fluxcd/flagger/pkg/controller/controller.go # processNextWorkItem
func (c *Controller) processNextWorkItem(a) bool {
// Get messages from the queue
obj, shutdown := c.workqueue.Get()
...
// This method is important because it stores the objects removed from the queue into sync.map
iferr := c.syncHandler(key); err ! =nil {
return fmt.Errorf("error syncing '%s': %w", key, err)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
return nil
}(obj)
...
}
Copy the code
syncHandler
- Initialization of the Canary CR state
- Canary is stored in sync.map
// fluxcd/flagger/pkg/controller/controller.go # syncHandler
func (c *Controller) syncHandler(key string) error{...// set status condition for new canaries
if cd.Status.Conditions == nil {
// Initialize the state in canary cr
iferr := c.setPhaseInitializing(cd); err ! =nil {
c.logger.Errorf("%s unable to set initializing status: %v", key, err)
return fmt.Errorf("%s initializing error: %w", key, err)
}
}
// Store the canary information
c.canaries.Store(fmt.Sprintf("%s.%s", cd.Name, cd.Namespace), cd)
// If opt in for revertOnDeletion add finalizer if not present
ifcd.Spec.RevertOnDeletion && ! hasFinalizer(cd) {iferr := c.addFinalizer(cd); err ! =nil {
return fmt.Errorf("unable to add finalizer to canary %s.%s: %w", cd.Name, cd.Namespace, err)
}
}
c.logger.Infof("Synced %s", key)
...
return nil
}
Copy the code
scheduler.go
fluxcd/flagger/pkg/controller/scheduler.go
scheduleCanaries
Facilitate sync.map and create scheduled tasks
// fluxcd/flagger/pkg/controller/scheduler.go # scheduleCanaries
func (c *Controller) scheduleCanaries(a) {
current := make(map[string]string)
stats := make(map[string]int)
c.logger.Infof("----95 canaries----")
/ / traverse sync. The Map
c.canaries.Range(func(key interface{}, value interface{}) bool {
cn := value.(*flaggerv1.Canary)
// format: <name>.<namespace>
name := key.(string)
current[name] = fmt.Sprintf("%s.%s", cn.Spec.TargetRef.Name, cn.Namespace)
job, exists := c.jobs[name]
/ / create canaryJob,
if(exists && job.GetCanaryAnalysisInterval() ! = cn.GetAnalysisInterval()) || ! exists {if exists {
job.Stop()
}
newJob := CanaryJob{
Name: cn.Name,
Namespace: cn.Namespace,
/ / this method is very important, this method is a callback canaryJob method is the core business logic, / / fluxcd flagger/PKG/controller/scheduler. Go # advanceCanary
function: c.advanceCanary,
done: make(chan bool),
ticker: time.NewTicker(cn.GetAnalysisInterval()),
analysisInterval: cn.GetAnalysisInterval(),
}
c.jobs[name] = newJob
// The logic in start is a timer
newJob.Start()
}
...
}
Copy the code
AdvanceCanary method
- Different logical processing is performed according to canay’s different states
- Find the specific implementation according to the provider and target.Kind defined in the Canary resource. If you want to add your own implementation, you can directly implement the corresponding interface
- Create additional services; Canary creates MAIN SVC (*-primary, *-canary) if MAIN SVC is not created, canary creates MAIN SVC (*-primary, *-canary
- Create *-canary ingress. If ingressRef is not defined in canary, the canary publication will fail because canary relies heavily on ingress.
- This method contains implementation logic for the specific A/B canary
Flow charts are on their way…
The Provider implements the following interfaces
type Interface interface {
Reconcile(canary *flaggerv1.Canary) error
SetRoutes(canary *flaggerv1.Canary, primaryWeight int, canaryWeight int, mirrored bool) error
GetRoutes(canary *flaggerv1.Canary) (primaryWeight int, canaryWeight int, mirrored bool, err error)
Finalize(canary *flaggerv1.Canary) error
}
Copy the code
Controller implements the following interfaces
type Controller interface {
IsPrimaryReady(canary *flaggerv1.Canary) error
IsCanaryReady(canary *flaggerv1.Canary) (bool, error)
GetMetadata(canary *flaggerv1.Canary) (string.string.map[string]int32, error)
SyncStatus(canary *flaggerv1.Canary, status flaggerv1.CanaryStatus) error
SetStatusFailedChecks(canary *flaggerv1.Canary, val int) error
SetStatusWeight(canary *flaggerv1.Canary, val int) error
SetStatusIterations(canary *flaggerv1.Canary, val int) error
SetStatusPhase(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error
Initialize(canary *flaggerv1.Canary) error
Promote(canary *flaggerv1.Canary) error
HasTargetChanged(canary *flaggerv1.Canary) (bool, error)
HaveDependenciesChanged(canary *flaggerv1.Canary) (bool. error) ScaleToZero(canary *flaggerv1.Canary) error ScaleFromZero(canary *flaggerv1.Canary) error Finalize(canary *flaggerv1.Canary) error }Copy the code
Canary Process combing
Flagger uses the canary deployment example to help us understand the canary release. At the end, I will bring the official YAML with me for easy checking
chestnuts
This chestnut contains three YAML files, please apply one by one during verification; Make sure flagger is installed on your K8S cluster before you verify
podinfo.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: podinfo
namespace: test
labels:
app: podinfo
spec:
replicas: 2
minReadySeconds: 5
revisionHistoryLimit: 5
progressDeadlineSeconds: 60
strategy:
rollingUpdate:
maxUnavailable: 1
type: RollingUpdate
selector:
matchLabels:
app: podinfo
template:
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9797"
labels:
app: podinfo
spec:
containers:
- name: podinfod
image: Stefanprodan/podinfo: 3.1.1
imagePullPolicy: IfNotPresent
ports:
- name: http
containerPort: 9898
protocol: TCP
- name: http-metrics
containerPort: 9797
protocol: TCP
- name: grpc
containerPort: 9999
protocol: TCP
command:
- ./podinfo
- --port=9898
- --port-metrics=9797
- --grpc-port=9999
- --grpc-service-name=podinfo
- --level=info
- --random-delay=false
- --random-error=false
env:
- name: PODINFO_UI_COLOR
value: "#34577c"
livenessProbe:
exec:
command:
- podcli
- check
- http
- localhost:9898/healthz
initialDelaySeconds: 5
timeoutSeconds: 5
readinessProbe:
exec:
command:
- podcli
- check
- http
- localhost:9898/readyz
initialDelaySeconds: 5
timeoutSeconds: 5
resources:
limits:
cpu: 2000m
memory: 512Mi
requests:
cpu: 100m
memory: 64Mi
Copy the code
ingress-podinfo.yaml
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: podinfo
namespace: test
labels:
app: podinfo
annotations:
kubernetes.io/ingress.class: "nginx"
spec:
rules:
- host: podinfo.test.jd.com
http:
paths:
- backend:
serviceName: podinfo
servicePort: 80
Copy the code
canary.yaml
This document has been cut down on the official basis to facilitate the verification of the main process
apiVersion: flagger.app/v1beta1
kind: Canary
metadata:
name: podinfo
namespace: test
spec:
provider: nginx
# deployment reference
targetRef:
apiVersion: apps/v1
kind: Deployment
name: podinfo
# ingress reference
ingressRef:
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
name: podinfo
# HPA reference (optional)
# autoscalerRef:
# apiVersion: autoscaling/v2beta2
# kind: HorizontalPodAutoscaler
# name: podinfo
# the maximum time in seconds for the canary deployment
# to make progress before it is rollback (default 600s)
progressDeadlineSeconds: 60
service:
# ClusterIP port number
port: 80
# container port number or name
targetPort: 9898
analysis:
# schedule interval (default 60s)
interval: 10s
# max number of failed metric checks before rollback
threshold: 10
# max traffic percentage routed to canary
# percentage (0-100)
maxWeight: 100
# canary increment step
# percentage (0-100)
stepWeight: 20
# NGINX Prometheus checks
#metrics:
#- name: request-success-rate
# # minimum req success rate (non 5xx responses)
# percentage (0-100)
# thresholdRange:
# min: 99
#interval: 1m
# testing (optional)
# webhooks:
# - name: acceptance-test
# type: pre-rollout
# url: http://flagger-loadtester.test/
# timeout: 30s
# metadata:
# type: bash
# cmd: "curl -sd 'test' http://podinfo-canary/token | grep token"
# - name: load-test
# url: http://flagger-loadtester.test/
# timeout: 5s
# metadata:
# cmd: "hey -z 1m -q 10 -c 2 http://podinfo.test.jd.com/"
Copy the code
The main logic
- How do I monitor new version changes to a user’s Deployment
In timing task newJob will invoke this method fluxcd/flagger/PKG/controller/scheduler. Go, this method can judge whether the deployment of the users have updated, If there is an update it will change the status of Canary. Status from the end state to new CanaryPhaseMisc
- There are two important sets of interfaces
1, supported canary targetRef type, all need to implement this interface fluxcd flagger/PKG/canary/controller. Go
2, canary supported by the provider type, the need to implement this interface fluxcd flagger/PKG/router, the router. Go
- How is the state initialized after applying canary. Yaml
Fluxcd flagger/PKG/controller/controller. Go # syncHandler method of initialization, at the same time will canary into sync. The Map, so that the timing tasks can be out of sync. The Map to get in
- How is client-Go represented in this project
Client-go is also used in this project, but the use of this project is similar to tektoncd, which directly realizes the interface and implements a new implementation. In this way, the local benefits are more flexible and more controllable
- Canary Status CRD two broken LastApplidSpec and LastPromotedSpec
Both values hold the hash value of deployment.spec.template
Including LastApplidSpec assignment in sync, that currently used is deployment. The spec. The template, In judging whether there is updated using fluxcd deployment/flagger/PKG/canary/status. Go # syncCanaryStatus;
LastPromotedSpec copies the state of LastApplidSpec to this value when the final Canary state becomes final (inited/Success)
- How is the flow ratio achieved
The first is the provider support used
For example, ingress allows traffic to be set based on the way annotations are set
Through the fluxcd/flagger/PKG/router/ingress. Go # SetRoutes can set the value of the annotation of ingress note