Recently, in the company’s data synchronization project (hereinafter referred to as ZDTP), it is necessary to use the distributed scheduling data synchronization execution unit. Currently, the scheme used is to package the data synchronization execution unit into a mirror image and use K8s for scheduling.

In ZDTP, the action of data synchronization can be abstracted into an execution unit (hereinafter referred to as worker), which is similar to the thread execution unit Runnable. Runnable is put into a queue to wait for the scheduled execution of the thread. After the execution of Runnable, its mission is completed. When a user creates a synchronization task in the ZDTP console and starts the task, several workers for the task will be generated according to the configuration of the synchronization task. Assuming that these workers are executed locally, they can be packaged as a Runnable and a thread can be created for execution, as shown in the following figure:

However, in single-machine mode, performance bottleneck will be encountered. At this point, distributed scheduling is required to dispatch workers to other machines for execution:

The question is how can we better schedule worker execution to other machines?

Research on Worker deployment mode

1. Deploy workers based on virtual machines

The Worker runs in the pre-created virtual machine, and the idle Worker needs to be selected according to the current Worker load when the task starts, which means that the Worker runs in the form of Agent, as shown in the following figure:

The main disadvantages are as follows:

  1. The number of Worker agents is relatively fixed, resulting in high cost of VM creation and inconvenient capacity expansion/reduction.
  2. Task running status depends on ZK monitoring mechanism. If a task fails during running, failover and automatic restart mechanisms need to be implemented to increase the development cycle.
  3. The Worker Agent load acquisition logic needs project implementation, and it is difficult to accurately obtain load information, which increases the development cycle.

2. Deploy Worker based on K8s

Pack workers into Docker images, use K8s to schedule jobs for Worker containers, and each Worker only runs one task, as shown in the following figure:

The advantages of using K8s are as follows:

  1. The Worker container that uses K8s cluster scheduling has the fault recovery function. As long as the Pod restartPolicy is set to restartPolicy=Always, K8s will automatically try to restart the Worker container no matter what happens to the Worker container during running. Greatly reduces the operation and maintenance costs and improves the high availability of data synchronization.
  2. Automatic load realization. For example, when a node has a high load, the Worker container will be scheduled to the node with a low load. More importantly, when a node breaks down, the workload on the node will be automatically transferred to other nodes by K8s.
  3. The life cycle of Worker is completely managed by K8s cluster, and the running status of Worker can be known only by invoking the relevant interface, which greatly reduces the development cycle.

K8s container scheduling scheme research

The scheduling object of K8s cluster is Pod, and there are many scheduling methods. The following methods are mainly introduced here:

1. Deployment (fully automatic Scheduling)

Before talking about Deployment, let’s talk about Replica Set, which is a very important concept in K8s. It is an abstraction of a higher level than Pod. In general, we use Deployment abstraction to manage the real application. Pod is the smallest unit in Deployment. It can define some Pod (such as the Pod packed with ZDTP Worker containers) to keep the expected value Set by the Replica Set at any time. For example, the Replica Set can Set the expected number of Pod copies. When k8S cluster finds that the number of copies of a Pod is less than the expected value Set by the Replica Set during regular inspection, it will create Pod instances according to the Pod template Set by the Replica Set to maintain the number of PODS at the expected value. It is also through the characteristics of Replica Set that the high availability of cluster is realized and the operation and maintenance cost is reduced.

Replica Set is used in Deployment. They are highly similar, so Deployment can also be regarded as the upgraded version of Replica Set.

2. Job (Batch scheduling)

We can use the K8S Job resource object to define and start a batch task. We can process a batch of Work items in parallel or serial, and then the task is finished.

1) Job Template Expansion mode

According to the operation mode of ZDTP Worker, we can use one Job image to correspond to one Worker, and create as many jobs as there are workers. Unless Pod is abnormal, the Pod will be restarted, and the Job will exit after normal execution, as shown in the following figure:

Queue with Pod Per Work Item

In this mode, the worker generated by the client is stored in a queue, and then only one job is created to consume the worker item in the queue. By setting the parallelism parameter, how many worker pods can be enabled to process workers simultaneously. In this mode, the Worker handler logic will only pull Worker processing from the queue and exit immediately after processing. The parameter Completions is used to control the number of pods that exit normally. When the number of pods that exit reaches completions, the Job ends. The Completions parameter controls the number of workers processed by the Job. As shown below:

3) Queue with Variable Pod Count

In Queue with Pod Per Work Item mode, if only one Pod exits properly, the Job has finished processing data. Each Pod has the logic to query whether there is a worker in the queue. Once it finds that there is no worker in the queue, the Pod exits normally. Therefore, in Queue with Variable Pod Count mode, completions can only be set to 1, and parallelism can be used to start how many worker pods can process workers at the same time.

This mode also requires the queue to let the Pod know if the worker is still there. Messaging-oriented middleware such as RocketMQ/Kafka cannot do this and will only keep the client waiting. Therefore, RocketMQ/Kafka can not be used in this mode. As shown below:

Of course, if there is a need to execute the Worker regularly later, using K8s cronJob (scheduled task scheduling) is a very good choice.

3. Pod (Default scheduling)

The kind= POD mode is directly used to start the container. This mode cannot set the number of running instances of the container, namely, replicas = 1. Production application clusters generally do not use this mode to start the container because it does not have the POD automatic capacity expansion feature.

It is worth mentioning that even if you have only one Replica of Pod, the official recommend using Replica Set for deployment.

Pod restart policy analysis

Pod restart policies include Always, onFailure, and Never:

  • Always: K8S automatically restarts the container when the container fails.
  • OnFailure: k8S restarts the container automatically when the container stops running and the exit code is not 0.
  • Never: k8S will Never restart the container regardless of its running state

The Deployment/Replica Set must be Set to Always (because they both need to keep the number of copies expected by Pod), while the Job can only be Set to onFailure and Never to ensure that the container does not restart after execution. Direct Pod start container the above three restart policies can be set.

One caveat here is that the situation is slightly more complicated with Job:

1) Pod RestartPolicy RestartPolicy=Never

If the Pod exits abnormally during Job scheduling and the container does not restart, the Job is completed only when at least one Pod execution (i.e., completions are at least 1) is completed. So, although pods that exit abnormally are no longer restarted, the Job tries to restart a Pod execution until the number of completions that are normally completed is completions.

$ kubectl get pod --namespace zdtp-namespace

NAME                   READY   STATUS               RESTARTS   AGE
zdtp-worker-hc6ld      0/1     ContainerCannotRun   0          64s
zdtp-worker-hfblk      0/1     ContainerCannotRun   0          60s
zdtp-worker-t9f6v      0/1     ContainerCreating    0          11s
zdtp-worker-v2g7s      0/1     ContainerCannotRun   0          31s
Copy the code

2) Pod RestartPolicy RestartPolicy=onFailure

If RestartPolicy=onFailure, the Pod will try to restart until the Pod is successfully executed, and the Job will not restart the Pod execution, as follows:

$ kubectl get pod --namespace zdtp-namespace

NAME                READY   STATUS             RESTARTS   AGE
zdtp-worker-5tbxw   0/1     CrashLoopBackOff   5          67s
Copy the code

How do I select a K8s scheduling policy?

After studying the K8s scheduling scheme and THE Pod restart strategy above, it is necessary to select an appropriate scheduling method according to the scheduling requirements of the project.

1. Incrementally synchronize Worker

This means that the Pod RestartPolicy must be RestartPolicy=Always, so you can only choose Deployment scheduling or directly create Pod Deployment. However, it is recommended to use Deployment. It has been officially stated that Deployment is still recommended even if the Pod copy is 1.

2. Fully synchronize Worker

Full synchronization Worker exits after data synchronization. It seems that Job scheduling or directly creating Pod deployment can be satisfied. However, at this stage, because full synchronization does not record the synchronization progress temporarily, it is required that the container cannot automatically restart after exiting if any error occurs during the process. The current practice is that when abnormal exit occurs during Worker execution, users need to delete the synchronized resources by themselves, and then manually start the Worker for full synchronization again.

Therefore, Job is not suitable for scheduling Worker pods at present. At the present stage, fully synchronizing workers is only suitable for direct deployment with Pod, and Pod RestartPolicy needs to be set RestartPolicy=Never.

Author’s brief introduction

The author Zhang Chenghui, good at messaging middleware skills, responsible for the company’s millions of TPS level Kafka cluster maintenance, maintenance of the public number “back-end advanced” irregularly share Kafka, RocketMQ series does not speak of the concept of direct combat summary and details of the source code analysis; At the same time, the author is also a Seata Contributor, an Ali open source distributed transaction framework, so he will share his knowledge about Seata. Of course, the public account will also share WEB related knowledge such as Spring bucket. The content may not be exhaustive, but it must make you feel that the author’s pursuit of technology is serious!

Public number: back-end advanced

Tech blog: objcoding.com/

GitHub:github.com/objcoding/