background

Some time ago, I happened to come into contact with Kubernetes Cronjob, and encountered the problem of cronjob schedule delay at a certain level during the access, so I started to read the code, found some problems and tried to tune it

Existing problems

According to the actual test in the production environment, 250-375 */1 * * * * Interval cronjobs generate delay every minute. There was no abnormal event for cronJob and Controller Manager, but the newly generated job was delayed. Because we set startingDeadlineSeconds, the accumulated delay finally led to the serious lag of cron task

Code reading

For the purpose of analyzing the above problems, I read the code of CronJob Controller. There are not many codes. Maybe due to the lack of GA, the design of the whole Controllor code is procedural, and it will not use some components like Informer, Components like refractor are relatively obscure to read

Let’s read the k8s cronjob controller code for the release1.17 branch

  • Controller struct
type Controller struct {
	kubeClient clientset.Interface 
	jobControl jobControlInterface
	sjControl  sjControlInterface
	podControl podControlInterface
	recorder   record.EventRecorder
}
Copy the code

Cronjob Controller structure, also known as JM (jobManager), mainly includes K8S Internal API Clinet Kubeclinet, JobControl and sjControl K8S job control block, cronJob controller will directly operate on the job, and the job will create the pod, and will not directly touch the pod object (including read).

  • The entry function Run:
// Run starts the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	klog.Infof("Starting CronJob Manager")
	// Check things every 10 second.
	go wait.Until(jm.syncAll, 10*time.Second, stopCh)
	<-stopCh
	klog.Infof("Shutting down CronJob Manager")}Copy the code

The CronJob Controller is a single-threaded, single-execution flow scheduler that makes a syncAll every 10 seconds from a Goroutine at an interval

  • The main loop function syncAll
// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *Controller) syncAll(a) {
	// List children (Jobs) before parents (CronJob).
	// This guarantees that if we see any Job that got orphaned by the GC orphan finalizer,
	// we must also see that the parent CronJob has non-nil DeletionTimestamp (see #42639).
	// Note that this only works because we are NOT using any caches here.
	jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
		return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(opts)
	}

	js := make([]batchv1.Job, 0)
	err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
		jobTmp, ok := object.(*batchv1.Job)
		if! ok {return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp)
		}
		js = append(js, *jobTmp)
		return nil
	})

	iferr ! =nil {
		utilruntime.HandleError(fmt.Errorf("Failed to extract job list: %v", err))
		return
	}

	klog.V(4).Infof("Found %d jobs".len(js))
	cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
		return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(opts)
	}

	jobsBySj := groupJobsByParent(js)
	klog.V(4).Infof("Found %d groups".len(jobsBySj))
	err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error {
		sj, ok := object.(*batchv1beta1.CronJob)
		if! ok {return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", sj)
		}
		syncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder)
		cleanupFinishedJobs(sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder)
		return nil
	})

	iferr ! =nil {
		utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err))
		return}}Copy the code

Pager.New(Pager.SimplePageFunc(jobListFunc)) calls jobListFunc via pager. These jobs are added to the slice, and the slices js := make([] batchv1.job, 0) is used to judge whether a single cronJob has been triggered when sync is performed later

In the same way, ager.New(ager.SimplePageFunc(cronJobListFunc)).EachListItem list All cronjob objects and call syncOne for each object to perform actual cronjob scheduling. After scheduling, call cleanupFinishedJobs to complete the cleanup

– Clear resources in apiserver based on the HistoryLimit for successfully executed jobs

– Retry the job that fails to be executed according to the limitBackoff limit

- If the job is in either of the preceding two states, ignore itCopy the code
  • The main scheduling function, syncOne

    func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {
    	nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
    
      // First look for child jobs of the current cronJob in the previous batchv1.Job slice and see if there are orphans of the current cronJob that are not in the jobActive list and completed jobs that are still in the Active list. Record Event (UnexpectedJob, SawCompletedJob) according to the state of job. Delete the unexpected states
    	childrenJobs := make(map[types.UID]bool)
    	for _, j := range js {
    		childrenJobs[j.ObjectMeta.UID] = true
    		found := inActiveList(*sj, j.ObjectMeta.UID)
    		if! found && ! IsJobFinished(&j) { recorder.Eventf(sj, v1.EventTypeWarning,"UnexpectedJob"."Saw a job that the controller did not create or forgot: %s", j.Name)
    		} else if found && IsJobFinished(&j) {
    			_, status := getFinishedStatus(&j)
    			deleteFromActiveList(sj, j.ObjectMeta.UID)
    			recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob"."Saw completed job: %s, status: %v", j.Name, status)
    		}
    	}
    
    	// Then check whether there are jobs in the Active list that are not sub-jobs of the current cronJob. If so, record MissingJob events and remove them from the Active list
    	for _, j := range sj.Status.Active {
    		iffound := childrenJobs[j.UID]; ! found { recorder.Eventf(sj, v1.EventTypeNormal,"MissingJob"."Active job went missing: %v", j.Name)
    			deleteFromActiveList(sj, j.UID)
    		}
    	}
    
      // Update the cronjob status
    	updatedSJ, err := sjc.UpdateStatus(sj)
    	iferr ! =nil {
    		klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
    		return
    	}
    	*sj = *updatedSJ
    
      // Check whether the cronjob is deleted. If the cronjob is deleted, stop scheduling the cronjob
    	ifsj.DeletionTimestamp ! =nil {
    		// The CronJob is being deleted.
    		// Don't do anything other than updating status.
    		return
    	}
      
      // Check whether the cronjob is suspended. If the cronjob is suspended, stop scheduling the cronjob
    	ifsj.Spec.Suspend ! =nil && *sj.Spec.Suspend {
    		klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
    		return
    	}
    
      / / getRecentUnmetScheduleTimes schedule is calculated according to the crontab job execution
      // Calculate the next schedule time and make some validity judgments according to the configured Unix cron table
    	times, err := getRecentUnmetScheduleTimes(*sj, now)
    	iferr ! =nil {
    		recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart"."Cannot determine if job needs to be started: %v", err)
    		klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
    		return
    	}
    	// If the table schedule of the cronjob is not obtained, the nextSchedule time of the cronjob is considered to be unreasonable, and the cronjob is stopped
    	if len(times) == 0 {
    		klog.V(4).Infof("No unmet start times for %s", nameForLog)
    		return
    	}
      // If the schedule is selected, calculate the last execution time. If the last execution time exceeds currentTime + StartingDeadlineSeconds, mark tooLate to stop scheduling and record the event
    	if len(times) > 1 {
    		klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog)
    	}
    
    	scheduledTime := times[len(times)- 1]
    	tooLate := false
    	ifsj.Spec.StartingDeadlineSeconds ! =nil {
    		tooLate = scheduledTime.Add(time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds)).Before(now)
    	}
    	if tooLate {
    		klog.V(4).Infof("Missed starting window for %s", nameForLog)
    		recorder.Eventf(sj, v1.EventTypeWarning, "MissSchedule"."Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z))
    		return
    	}
      // If a concurrent policy is configured for the cronjob, parallel jobs are scheduled based on the corresponding concurrent policy
    	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 {
    		klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog)
    		return
    	}
    	if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent {
    		for _, j := range sj.Status.Active {
    			klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
    
    			job, err := jc.GetJob(j.Namespace, j.Name)
    			iferr ! =nil {
    				recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet"."Get job: %v", err)
    				return
    			}
    			if! deleteJob(sj, job, jc, recorder) {return}}}// Get the Job object according to the JobTemplate configuration in CronJob Spec, where the name of the Job object is added with the Hash calculated by scheduledTime (currently Unix TIMESTAMP)
    	jobReq, err := getJobFromTemplate(sj, scheduledTime)
    	iferr ! =nil {
    		klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
    		return
    	}
      // Call createJob to create a new job based on the jobTemplate
    	jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
    	iferr ! =nil {
    		// If the namespace is being torn down, we can safely ignore
    		// this error since all subsequent creations will fail.
    		if! errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { recorder.Eventf(sj, v1.EventTypeWarning,"FailedCreate"."Error creating job: %v", err)
    		}
    		return
    	}
    	klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
    	recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate"."Created job %v", jobResp.Name)
    
    	// Add the newly created Job to the Active list of cronjobs, set LastScheduleTime, and update the CronJob
    	ref, err := getRef(jobResp)
    	iferr ! =nil {
    		klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog)
    	} else {
    		sj.Status.Active = append(sj.Status.Active, *ref)
    	}
      // lastSchedulerTime is used to monitor whether the job schedule meets expectations
    	sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
    	if_, err := sjc.UpdateStatus(sj); err ! =nil {
    		klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
    	}
    
    	return
    }
    Copy the code
  • To obtain the cron getRecentUnmetScheduleTimes function table

/ / getRecentUnmetScheduleTimes, according to the time passed in the jobSpec to determine the effectiveness of these time series, such as whether the overdue, If the number of expired cronjobs is greater than 100, the job schedule is faulty and the cronjob is stopped
func getRecentUnmetScheduleTimes(sj batchv1beta1.CronJob, now time.Time) ([]time.Time, error) {
	starts := []time.Time{}
  // Use robfig/cron to parse cron schedule
	sched, err := cron.ParseStandard(sj.Spec.Schedule)
	iferr ! =nil {
		return starts, fmt.Errorf("unparseable schedule: %s : %s", sj.Spec.Schedule, err)
	}

	var earliestTime time.Time
  // Determine the initial time. If the CronJob has been executed before, the previous execution takes effect. If the CronJob has not been executed before, the creation time of the CronJob takes effect
	ifsj.Status.LastScheduleTime ! =nil {
		earliestTime = sj.Status.LastScheduleTime.Time
	} else {
		// If none found, then this is either a recently created scheduledJob,
		// or the active/completed info was somehow lost (contract for status
		// in kubernetes says it may need to be recreated), or that we have
		// started a job, but have not noticed it yet (distributed systems can
		// have arbitrary delays). In any case, use the creation time of the
		// CronJob as last known start time.
		earliestTime = sj.ObjectMeta.CreationTimestamp.Time
	}
  // If StartingDeadlineSeconds is set and the current time minus the value is later than the original time, the new time is used. Usually, it is extremely lateTime and the old job is aborted
	ifsj.Spec.StartingDeadlineSeconds ! =nil {
		schedulingDeadline := now.Add(-time.Second * time.Duration(*sj.Spec.StartingDeadlineSeconds))

		if schedulingDeadline.After(earliestTime) {
			earliestTime = schedulingDeadline
		}
	}
  // Stop scheduling if the expected time is later than now
	if earliestTime.After(now) {
		return []time.Time{}, nil
	}

  // Calculate the time of all tasks that need to be executed from the initial time to the present time and check whether there are expired jobs
	// If the number of expired cronjobs is too many (>100), the job schedule is faulty and the cronjob is stopped
	fort := sched.Next(earliestTime); ! t.After(now); t = sched.Next(t) { starts =append(starts, t)
		if len(starts) > 100 {
			// We can't get the most recent times so just return an empty slice
			return []time.Time{}, fmt.Errorf("too many missed start time (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew")}}return starts, nil
}

Copy the code

Basically, the main business logic is here, and it is still very simple and crude on the whole. It is a process of one-by-one non-stop polling, calculating tasks to be executed, task execution schedule and task status synchronization.

There are also three possible problems observed from the above code:

  • Use pager.List in syncAll function. This is a very redundant operation
  • Instead of actively querying and updating cronjob status changes during each loop, informer registers event callback to synchronize status changes
  • For cronjobs whose expiration number exceeds 100, scheduling is stopped and only events are recorded. There is no self-healing process of dropping old jobs

tuning

First of all, try to replace pager.List with informer Watch, the idea is relatively simple, the original is to get all the jobs/cronjobs under namespace through the callback passed in pager.List. Now in the new controller was registered to watch the event, listen to the change of events by k8s encapsulation. Good internal API. SharedInformer retrieve and constructed as objects of the same struct

Let’s take a look at the original pager.List:

	cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
		return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(opts)
	}
	err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error{... }Copy the code

Then register the event in the Controller as follows:

type Controller struct {
	kubeClient clientset.Interface 
	jobControl jobControlInterface
	sjControl  sjControlInterface
	podControl podControlInterface
	recorder   record.EventRecorder
  
  // Codes after refractor
	queue workqueue.RateLimitingInterface
	cronjobSynced cache.InformerSynced
	syncHandler func(key string) error
	cronjobLister batchv1beta1Lister.CronJobLister} // Then registerInformerAnd the statementCronJobListenerThe correspondingcallback(add/update/delete)
func NewCronJobController(kubeClient clientset.Interface) (*CronJobController, error) {
	jm, err := NewController(kubeClient)
	iferr ! =nil {
		return nil, err
	}
		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),
	}
	cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	})

	return jm, nil
}

Copy the code

The mechanism of K8S Informer needs an event trigger to determine which event is triggered. Therefore, we can simply add the trigger when adding, deleting or modifying

Therefore, the above code is changed to

func (jm *CronJobController) addCronjob(obj interface{}) {
	d := obj.(*batchv1beta1.CronJob)
	glog.V(4).Infof("Adding CronJob %s", d.Name)
	jm.enqueue(d)
}

func (jm *CronJobController) updateCronjob(old, cur interface{}) {
	oldC := old.(*batchv1beta1.CronJob)
	curC := cur.(*batchv1beta1.CronJob)
	glog.V(4).Infof("Updating CronJob %s", oldC.Name)
	jm.enqueue(curC)
}

func (jm *CronJobController) deleteCronjob(obj interface{}) {
	d, ok := obj.(*batchv1beta1.CronJob)
	if! ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if! ok { utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
			return
		}
		d, ok = tombstone.Obj.(*batchv1beta1.CronJob)
		if! ok { utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a CronJob %#v", obj))
			return
		}
	}
	glog.V(4).Infof("Deleting CronJob %s", d.Name)
	jm.enqueue(d)
}

// Then register the Informer and declare the corresponding callback(add/update/delete) for CronJobListener
func NewCronJobController(kubeClient clientset.Interface) (*CronJobController, error) {
	jm, err := NewController(kubeClient)
	iferr ! =nil {
		return nil, err
	}
		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),}// event trigger
	cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jm.addCronjob,
		UpdateFunc: jm.updateCronjob,
		DeleteFunc: jm.deleteCronjob,
	})

	// What should I do when an event trigger obtains a cronjob
  // The previous logic dictates that syncOne should be called for each cronjob
	jm.cronjobLister = cronjobInformer.Lister()
  jm.cronjobSynced = cronjobInformer.Informer().HasSynced
  jm.syncHandler = jm.syncOne
	return jm, nil
}
Copy the code

Then the above code just declares a bunch of event triggers that need to be registered (adding or deleting cronJobs to the workQueue) and event Handlers (syncOne).

But the syncAll main function is still doing its job, so WHAT I expect is to block in the main function to get the job in the workQueue, And process each job in FIFO mode (there is no need to create goroutine syncOne for each job, which will increase the complexity, and its own processing power is enough to guess).

So change the syncAll main function code to

// Provide enqueue to queue batchv1beta1.CronJob
func (jm *CronJobController) enqueue(cronjob *batchv1beta1.CronJob) {
	key, _ := controller.KeyFunc(cronjob)
	sjs := sjl.Items
	glog.V(4).Infof("Found %d cronjobs".len(sjs))

	jobsBySj := groupJobsByParent(js)
	jm.queue.Add(key)
}

// Provide a function to get the object's key from the informer and enqueue it to the worker queue
func (jm *CronJobController) EnqueueCronjob(a) {
		cjobs, err := jm.cronjobLister.List(labels.Everything())
		iferr ! =nil {
			fmt.Errorf("Could not list all cronjobs %v", err)
			return
		}
		for _, job := range cjobs {
			jm.enqueue(job)
		}
}

// Start the watch cronjob with a goroutine in the syncAll main function, and call the syncOne function once the job is available
func (jm *CronJobController) syncAll(a){...go wait.Until(func(a) {
      EnqueueCronjob()
	  }, time.Second*10, stopCh)
  
  	key, quit := jm.queue.Get()
    // Get is a block operation that will interrupt if sigterm/ SIGkill is received
    if quit {
      return false
    }
    defer jm.queue.Done(key)

  	// Invoke registered callback
    err := jm.syncHandler(key.(string))... }Copy the code

In this way, the overall logic is probably clear, and some checking work needs to be done. For example, another validity judgment needs to be added to the syncOne function because the deleted cronjob cannot be confirmed

func syncOne(...).{... cronjob, err := jm.cronjobLister.CronJobs(namespace).Get(name)if errors.IsNotFound(err) {
		glog.V(2).Infof("CronJob %v has been deleted", key)
		return nil
  }
  iferr ! =nil {
      return err
  }
  ...
}
Copy the code

To sum up, we transformed the mode of ager.List into the mode of Informer Watch, and the overall transformation code is as follows

type Controller struct {
	kubeClient clientset.Interface 
	jobControl jobControlInterface
	sjControl  sjControlInterface
	podControl podControlInterface
	recorder   record.EventRecorder
  
  // Codes after refractor
	queue workqueue.RateLimitingInterface
	cronjobSynced cache.InformerSynced
	syncHandler func(key string) error
	cronjobLister batchv1beta1Lister.CronJobLister
}

func (jm *CronJobController) addCronjob(obj interface{}) {
	d := obj.(*batchv1beta1.CronJob)
	glog.V(4).Infof("Adding CronJob %s", d.Name)
	jm.enqueue(d)
}

func (jm *CronJobController) updateCronjob(old, cur interface{}) {
	oldC := old.(*batchv1beta1.CronJob)
	curC := cur.(*batchv1beta1.CronJob)
	glog.V(4).Infof("Updating CronJob %s", oldC.Name)
	jm.enqueue(curC)
}

func (jm *CronJobController) deleteCronjob(obj interface{}) {
	d, ok := obj.(*batchv1beta1.CronJob)
	if! ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if! ok { utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
			return
		}
		d, ok = tombstone.Obj.(*batchv1beta1.CronJob)
		if! ok { utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a CronJob %#v", obj))
			return
		}
	}
	glog.V(4).Infof("Deleting CronJob %s", d.Name)
	jm.enqueue(d)
}

// Then register the Informer and declare the corresponding callback(add/update/delete) for CronJobListener
func NewCronJobController(kubeClient clientset.Interface) (*CronJobController, error) {
	jm, err := NewController(kubeClient)
	iferr ! =nil {
		return nil, err
	}
		queue:      workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "cronjob"),}// event trigger
	cronjobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    jm.addCronjob,
		UpdateFunc: jm.updateCronjob,
		DeleteFunc: jm.deleteCronjob,
	})

	// What should I do when an event trigger obtains a cronjob
  // The previous logic dictates that syncOne should be called for each cronjob
	jm.cronjobLister = cronjobInformer.Lister()
  jm.cronjobSynced = cronjobInformer.Informer().HasSynced
  jm.syncHandler = jm.syncOne
	return jm, nil
}

// Provide enqueue to queue batchv1beta1.CronJob
func (jm *CronJobController) enqueue(cronjob *batchv1beta1.CronJob) {
	key, _ := controller.KeyFunc(cronjob)
	sjs := sjl.Items
	glog.V(4).Infof("Found %d cronjobs".len(sjs))

	jobsBySj := groupJobsByParent(js)
	jm.queue.Add(key)
}

// Provide a function to get objects from the informer and enqueue them to the worker queue
func (jm *CronJobController) EnqueueCronjob(a) {
		cjobs, err := jm.cronjobLister.List(labels.Everything())
		iferr ! =nil {
			fmt.Errorf("Could not list all cronjobs %v", err)
			return
		}
		for _, job := range cjobs {
			jm.enqueue(job)
		}
}

// Start the watch cronjob with a goroutine in the syncAll main function, and call the syncOne function once the job is available
func (jm *CronJobController) syncAll(a){...go wait.Until(func(a) {
      EnqueueCronjob()
	  }, time.Second*10, stopCh)
  
  	key, quit := jm.queue.Get()
    // Get is a block operation that will interrupt if sigterm/ SIGkill is received
    if quit {
      return false
    }
    defer jm.queue.Done(key)

  	// Invoke registered callback
    err := jm.syncHandler(key.(string))... }func syncOne(...).{... cronjob, err := jm.cronjobLister.CronJobs(namespace).Get(name)if errors.IsNotFound(err) {
		glog.V(2).Infof("CronJob %v has been deleted", key)
		return nil
  }
  iferr ! =nil {
      return err
  }
  ...
}
Copy the code