preface
Recently there was a requirement in the company to further control the resources of the K8S cluster for a project to avoid resource waste.
At present, the project requires a high resource usage, requiring three core CPUS and two gigabytes of memory. There was no flexible scheduling at the beginning. This will keep Pod running, even when there are no tasks. This will reduce resource Requests and Limits for Resources under K8S, but it will still be a waste of Resources.
introduce
Before the beginning of the text, the process needs to be introduced to facilitate the understanding of the following text.
First, other departments will insert a data into the database, and then the scheduler will scan the database periodically. When a new data is scanned, the scheduler will call THE API of K8S to create a Job resource. There is a Pod in the Job, and the Pod will do some tasks. And then it’s over.
It looks simple, but there are a few caveats:
- Due to the
Pod
It requires environment variables, andPod
It is created by the scheduler. So that’s when you have to pass in the variables step by step - The scheduler cannot change any data, only from the database, for better decoupling. The scheduler should not be concerned with any business logic or data
- The scheduler itself cannot hold any state, because once state is involved, it has to go somewhere to store it. Because the scheduler itself has to be rebooted. This only adds to the burden.
- You need to check whether the current cluster has resources to restart
Pod
the
start
The scheduler is developed using GoLang, so Go will be used as the main language for the rest of this article.
Create a debuggable K8S environment
At present, because we use Go for development, we use the official CLIENT-Go library of K8S. The library itself provides some methods for creating clientsets (think of a clientset as a channel to communicate with the current cluster master)
package main
import (
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main(a) {
// This method contains k8S 'own configuration of the Cluster
kubeConfig, err := rest.InClusterConfig()
iferr ! =nil {
// In a development environment, use the current minikube. The KUBECONFIG variable needs to be configured
// If it is minikube, the KUBECONFIG variable can point to $HOME/.kube/config
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{}).ClientConfig()
// If the KUBECONFIG variable is not configured and is not currently running in the cluster
iferr ! =nil {
panic("get k8s config fail: " + err.Error())
}
}
// Failed to create clientSet
clientset, err := kubernetes.NewForConfig(kubeConfig)
iferr ! =nil {
panic("failed to create k8s clientset: " + err.Error())
}
// Created successfully
fmt.Println(clientset)
}
Copy the code
The rest. InClusterConfig () code is very simple, is to go to the current machine under/var/run/secrets/kubernetes. IO/serviceaccount/read token and ca. Read the KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT environment variables and put them together.
Rest.inclusterconfig () is for machines in and around a cluster. Not in a local development environment. So we need another way to solve this problem.
As you can see, I’ve already done this. When InClusterConfig fails, I’ll execute the following code instead:
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{}).ClientConfig()
Copy the code
This code is actually relatively simple, is to read the current environment under KUBECONFIG to obtain the configuration path of the local K8S. If not, go to the. Kube /config file in the current user directory. Finally according to the file to modify the required configuration. The main source is visible: NewDefaultClientConfigLoadingRules, ClientConfig
Now as long as you ensure that the local minikube environment can be normal debugging, development.
Refer to Rook for the above method
Create a Job and Pod
Database query here is no longer elaborated, can be adapted according to their own business, development. This is just a little bit of a primer. It’s not just databases, it’s anything else, it’s really about what’s right for your business.
Let’s say we get a piece of data from the database, and we need to pass the value of the database to Pod. Avoid making another query in Pod. Now we need to define Job:
import (
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// Job Required configuration
type JobsSpec struct {
Namespace string
Image string
Prefix string
}
// Returns the specified CPU and memory resource values
// See: https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test .go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
res := apiv1.ResourceList{}
ifcpu ! ="" {
res[apiv1.ResourceCPU] = resource.MustParse(cpu)
}
ifmemory ! ="" {
res[apiv1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// Returns the ResourceRequirements object, as detailed in the getResourceList function comment
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
res := apiv1.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// Convert to pointer
func newInt64(i int64) *int64 {
return &i
}
// Create the job configuration
// Returns the specified CPU and memory resource values
// See: https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test .go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
res := apiv1.ResourceList{}
ifcpu ! ="" {
res[apiv1.ResourceCPU] = resource.MustParse(cpu)
}
ifmemory ! ="" {
res[apiv1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// Returns the ResourceRequirements object, as detailed in the getResourceList function comment
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
res := apiv1.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// Job Required configuration
type jobsSpec struct {
Namespace string
Image string
Prefix string
}
// Create the job configuration
func (j *jobsSpec) Create(envMap map[string]string) *batchv1.Job {
u2 := uuid.NewV4().String()[:8]
name := fmt.Sprint(j.Prefix, "-", u2)
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: j.Namespace,
},
Spec: batchv1.JobSpec{
Template: apiv1.PodTemplateSpec{
Spec: apiv1.PodSpec{
RestartPolicy: "Never",
Containers: []apiv1.Container{
{
Name: name,
Image: j.Image,
Env: EnvToVars(envMap),
ImagePullPolicy: "Always",
Resources: getResourceRequirements(getResourceList("2500m"."2048Mi"), getResourceList("3000m"."2048Mi"),},},},},},},},}}Copy the code
There’s nothing to talk about here, just resource definitions, doorsteps and comments.
The above code is actually missing the part where variables are injected. EnvToVars, the core code is as follows:
// Convert the object to an environment variable format acceptable to K8S
func EnvToVars(envMap map[string]string) []v1.EnvVar {
var envVars []v1.EnvVar
for k, v := range envMap {
envVar := v1.EnvVar{
Name: k,
Value: v,
}
envVars = append(envVars, envVar)
}
return envVars
}
// Get all the variables in the current system and convert them to map
func GetAllEnvToMap(a) map[string]string {
item := make(map[string]string)
for _, k := range os.Environ() {
splits := strings.Split(k, "=")
item[splits[0]] = splits[1]}return item
}
// Merge two maps. For better performance, use closures so that sourceMap only needs to be called once
func MergeMap(sourceMap map[string]string) func(insertMap map[string]string) map[string]string {
return func(insertMap map[string]string) map[string]string {
for k, v := range insertMap {
sourceMap[k] = v
}
return sourceMap
}
}
Copy the code
And then this is what happens when you use it:
job := jobsSpec{
Prefix: "project-" + "dev" + "-job",
Image: "docker image name",
Namespace: "default",
}
willMergeMap := MergeMap(GetAllEnvToMap())
// dbData is the data obtained from the database in the following format
// [ { id: 1, url: 'xxx' }, { id: 2, url: 'yyy' } ]
for _, data := range dbData {
currentEnvMap := willMergeMap(data)
/ / create the Job
_, err = api.CreateJob(currentEnvMap)
iferr ! =nil {
panic("create job fail", err.Error())
}
}
Copy the code
This allows you to pass the current environment variables and data to Pod as variables. In this case, just make sure that the current scheduler has some variables that Pod can use, such as S3 Token, DB Host, etc. In this way, the Pod basically does nothing, and the variables it needs are passed to it by the scheduler, with a clear division of labor.
To optimize the
In fact, the above has already completed the most core things, itself is not particularly difficult. Simple logic. But that’s not enough. There’s a lot more to consider.
Resources determine
The scheduler cannot change any data, only the container inside the Pod can change the data.
So there’s a problem here.
If the cluster resource is not allocated enough, the Pod will always be Pending. According to the above, the variable is injected into the Pod, and the container is not started. That will result in the data not changing, and the unchanged data will always be considered new by the scheduler. As a result, another Job is started for the data, and the loop continues until one of the pods changes the data when the cluster resources are sufficient.
For example, if there is a “status” field in the database, when the value is “wating”, the scheduler considers this data to be new data, converts this data into an environment variable, and injects it into a Pod, which then changes “waiting” to “process”. The scheduler scans the data every three minutes, so the Pod must complete the changes within three minutes.
At this time, due to insufficient resources, K8S creates the Pod, but the code inside is not running, so the data is not changed, which will cause the scheduler to create the Pod for the same data all the time.
The solution is as simple as checking whether the Pod state is Pending. If so, stop creating the Pod. Here is the core code:
func HavePendingPod(a) (bool, error) {
// Get all the pods under the current namespace
pods, err := clientset.CoreV1().Pods(Namespace).List(metaV1.ListOptions{})
iferr ! =nil {
return false, err
}
// Loop through pods to determine if each pod matches the current prefix. If so, the current environment is in a Pending state
for _, v := range pods.Items {
phase := v.Status.Phase
if phase == "Pending" {
if strings.HasPrefix(v.Name, Prefix) {
return true.nil}}}return false.nil
}
Copy the code
When true, the Job is not created
Maximum number of jobs
Cluster resources are not infinite, and Pending cases are handled, but this is just a defense. We still need to control the number of jobs. When the number of jobs is equal to a certain value, we no longer create jobs. The code is also very simple, so I’ll show you the code that gets the number of jobs in the current environment:
// Get the job Item instance of the same environment under the current namespace
func GetJobListByNS(a) ([]v1.Job, error) {
var jobList, err = clientset.BatchV1().Jobs(Namespace).List(metaV1.ListOptions{})
iferr ! =nil {
return nil, err
}
// Filter jobs with different prefixes
var item []v1.Job
for _, v := range jobList.Items {
if strings.HasPrefix(v.Name, Prefix) {
item = append(item, v)
}
}
return item, nil
}
func GetJobLenByNS(a) (int, error) {
jobItem, err := api.GetJobListByNS()
iferr ! =nil {
returnMaximum value, err}return len(jobItem), nil
}
Copy the code
Delete completed and failed
The problem with the above code is that the k8S Job resource type has a feature that does not delete itself when it is completed or fails, meaning that its data will remain there even after it is completed. So the above code takes into account any completed or failed jobs. You end up with the embarrassment of never being able to create a Job.
Solution has two, first is Job resources to the statement, adding the spec. TtlSecondsAfterFinished properties to achieve k8s automatic recycling, complete failure of the Job. Unfortunately, this attribute is only available in the higher version. We regret that it is in the lower version. The second method is to call the API to delete completed and failed jobs before fetching the number:
func DeleteCompleteJob(a) error {
jobItem, err := GetJobListByNS()
iferr ! =nil {
return err
}
// If this attribute is not specified, pod will not be deleted when job is deleted
propagationPolicy := metaV1.DeletePropagationForeground
for _, v := range jobItem {
// Delete only finished jobs
if v.Status.Failed == 1 || v.Status.Succeeded == 1 {
err := clientset.BatchV1().Jobs(Namespace).Delete(v.Name, &metaV1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
iferr ! =nil {
return err
}
}
}
return nil
}
Copy the code
At the end
The whole scheduler code is relatively simple, there is no need to separate a library to do. As long as you know the general idea, you can make a scheduler suitable for your project team according to your own project needs.
Thanks to @qqshfox for his ideas