Introduction of Kubernetes

What is a Kubernetes?

Kubernetes I believe we are familiar with, nearly two years we are discussing the topic of cloud native, discuss Kubernetes. So what is Kubernetes?

  • K8s is a resource management system.

If you are familiar with Yarn and Mesos, you can develop distributed software or applications based on its API or SDK after deploying the resource management system on a batch of bare physical machines. For example, traditional MapReduce can be developed on Yarn, distributed Web Servers can be developed on K8s, or big data computing tasks can be developed.

  • K8s is a container choreography system.

Different from traditional Yarn, K8s is all containerized in the process of running, but the container is not just a Docker container, it also includes Rocket and other related isolation measures. If the requirements are high in a production environment, there may be some safety Containers such as Kata Containers, etc. The applications deployed by K8s on Slave are distributed and managed in a containerized way, and isolated by containerized technology.

  • K8s is an automated operation and maintenance system.

It is a declarative API, we only need to tell K8s that the cluster needs to create a Deployment, set the number of replicas, and what state it needs to reach, and the scheduling system (K8s) will help us maintain the state until the set state is reached. If a failover or failure occurs, it automatically migrates the task to another machine to meet the current schedule.

  • Cloud native.

At present, almost all cloud vendors have provided K8s service support, including Domestic Ali, international Amazon, Google and so on, including the traditional Microsoft has provided Managed or Unmanaged service for K8s. With the current use of Lambda expressions or Function calculations, the Serverless approach is becoming more popular. In addition to the traditional deployment of small clusters, generating a manager through the cloud, building a large Serverless cluster, and then charging users for computing resources on demand is a new model.

Kubernetes architecture

Kubernetes architecture diagram

The diagram above shows the basic architecture of K8s, which is a very typical master-slave architecture.

  1. On Master, it consists of controllers, API Servers, schedulers, and etcds including storage. Etcd can be counted as Master or treated as a store independent of Master. The Master Controller, API Server, and Scheduler are all separate process modes. This is a bit different from Yarn, where the entire Master of Yarn is in a single-process mode. The K8s Master can also perform spontaneous elections among multiple masters, which are then served by active masters.
  2. On Slave, it mainly includes Kube Proxy, Kubelet, Docker and other related components. Related components deployed on each Node are similar and manage multiple pods running on it.
  3. Users can submit tasks to K8s on the UI or CLI based on their habits. Users can submit tasks through the Dashboard Web UI provided by K8s or through the Kubectl command line.

Some concepts of Kubernetes

  • ConfigMap

ConfigMap is a K-V data structure. The common usage is to mount ConfigMap to a Pod as a configuration file for use by new processes in the Pod. In Flink, you can write Log4j files or flink-conf files to ConfigMap and mount them to Pod before JobManager or TaskManger is up. JobManager then reads the corresponding CONF file, loads its configuration, and correctly pulls up the corresponding JobManager components.

  • Service (SVC)

A way to expose your service to the outside world. If you have an internal Service that needs to be accessed outside of K8s, you can use a Service and then expose it as a LoadBalancer or NodePort.

If you have a Service that you don’t want or need to expose, you can set it to Cluster IP or None in Headless mode. At this point, it can be used to interconnect services, such as traditional front-end to back-end services, or in the case of non-HA in Flink, TaskManager to JobManager, etc.

  • Pod

Pod is the smallest dispatch unit in K8s. K8s are all scheduled with Pod. Each Pod can contain one or more Containers. Each Container has its own resources that are isolated from each other, but all containers share the same network, which means that all containers can communicate directly with each other through localhost.

In addition, some files can be shared between Containers using Volume. For example, if some logs are generated in the Pod of JobManager or TaskManager, it is against the native semantics of K8s to collect logs in another process in the same Pod. You can use SideCar to go to another Container to collect JobManager logs. This is the specific use of multiple containers for one Pod.

  • Deployment

Because the Pod can be terminated at any time, it cannot be pulled up to perform failover and other related operations. Deployment provides a higher level of abstraction on top of Pod. Deployment can set the Pod state, for example, if five TaskManagers are required, Deployment will maintain the current state. When a TaskManager dies, it starts a new TaskManager to replace it. This allows you to avoid reporting Pod status yourself and do more complex management failover, etc. This is also the most basic concept – operation automation.

Current usage

What kinds of tasks are currently running on K8s?

In addition to the traditional Web and mobile terminal some stateless storage related tasks such as MySQL, Kafka and other stateful services are also constantly adapted and run on K8s. In addition, deep learning framework Tensorflow can run on K8s natively, including Spark and Flink, etc. Some big data related frameworks are also constantly compatible and adaptable, so that more big data services can run on K8s better.

We can see that K8s is more inclusive than Yarn or traditional Hadoop. It can run multiple computing frameworks and engines such as storage, deep learning, big data and OLAP analysis on K8s. One of the big benefits of this is that the entire company can run all of its storage, real-time computing, batch computing, including deep learning, OLAP analytics, and so on, all in one cluster, just by managing a scheduling architecture. In addition to easier administration, better cluster utilization can also be achieved.

Deployment evolution of Flink On Kubernetes

The easiest way for Flink to deploy on K8s is in a Standalone manner. The advantage of this deployment is that there is no need to change Flink, and Flink is not aware of the K8s cluster, and Flink can be run by external means.

Standalone Session On K8s

Standalone way to run steps in K8S:

Standalone Session On K8s mode

As shown in the figure:

  • Step 1 submit the request to K8s Master using Kubectl or K8s Dashboard.
  • Step 2, K8s Master sends the request to create Flink Master Deployment, TaskManager Deployment, ConfigMap, SVC to Slave to create these four roles. Then Flink Master and TaskManager start.
  • Step 3, The TaskManager registers with JobManager. In non-HA cases, the JobManager is registered with an internal Service.
  • At this point, Flink’s Sesion Cluster has been created. At this point, you can submit the task.
  • Step 4, submit the Flink run command on the Flink Cluster, and submit the corresponding task by specifying the address of Flink Master. The Jar and JobGrapth of the user will be generated in the Flink Client. Via SVC to Dispatcher.
  • In step 5, the Dispatcher sees that a new Job has been submitted, and a new JobMaster is created to run the Job.
  • In step 6, the JobMaster will apply for resources from ResourceManager. As the Standalone mode does not have the capability to actively apply for resources, the Standalone mode will return directly, and the TaskManager has been registered in advance.
  • At step 7-8, JobMaster deploys the Task to the corresponding TaskManager and the entire Task execution process is complete.

Standalone perjob on K8s

Now let’s look at the deployment of PerJobs. Because Session clusters and PerJobs have different application scenarios, a Session can run multiple tasks, but there is no way to achieve better isolation between each task. In the Perjob mode, each job has its own independent Flink Cluster to run, and they are independent of each other.

■ Perjob features:

  1. The user’s JARS and dependencies are either pre-compiled in the image or initialized by Init Container before the actual Container starts.
  2. Each Job starts a new Cluster.
  3. A one-step commit does not need to start the Cluster and then commit the task as Session Cluster does.
  4. The user’s main method runs in a Cluster. In a special network environment, if the main method needs to run in the Cluster, Session mode cannot be used, while Perjob mode can be used.
Standalone Perjob on K8s

■ Procedure:

Executed by Standalone JobCluster EntryPoint, find the user Jar from the classpath and execute its main method to get JobGrapth. Submit it to Dispathcher, which uses the Recover Job logic, and submit it to JobMaster. JobMaster applies for resources from ResourceManager, requests slots, and executes jobs.

Helm Chart way

Helm is similar to Yum on Linux.

Helm in K8s is a package management tool that makes it easy to install a package. Operations such as deploying a Flink cluster can be completed in one step only with helm Install. There is essentially no difference, except that it is reorganized with Helm, including some templates and so on, to make it easier to use.

Flink Kubernetes Operator

Flink Kubernetes Operator mode

  • Task life cycle management

Using Operator to manage Flink, mainly to manage the situation of multiple clusters, can play a role of task life cycle management. It’s not on the same level as the Standalone, Native way, per se; it’s similar to a higher level tool for doing task management.

  • Based on K8s Operator, it is convenient to create Flink Cluster.

To create a Perjob Cluster, you may need to deploy it multiple times. If the task is to be upgraded, you may even need to delete the previous Cluster, modify the configuration, and redeploy it.

Introducing the K8s Operator requires only a few simple operations. For example, if Operator has its own set of YAML descriptions, modifying a field, such as the version field of image, the background will automatically trigger some restart, including savepoint of the currently executing task, and then destroy the Cluster. A new orientation can pull the cluster up, and so on, a series of automated operations. Configuration changes to Flink can also be done in the background automatically.

Operater currently has two open source operators, Lyft and Google, which are similar in function and have been well combined with the current Standalone Cluster, which has reached the production availability standard.

Reference:

1.lyft/flinkk8soperator

Github.com/lyft/flinkk…

2.GoogleCloudPlatform/flink-on-k8s-operator

Github.com/GoogleCloud…

conclusion

Of course, Flink on K8s also has some shortcomings at present:

  • Flink does not sense the presence of K8s either by Operator, Helm Chart, or by using Kubectl Yaml directly.
  • Currently, static resource allocation is used. Determine how many TaskManagers are required in advance. If the concurrency of a Job needs to be adjusted, the resource status of the TaskManager must be adjusted accordingly; otherwise, the task cannot be executed properly.
  • You need to have a basic understanding of containers, operators, or K8s to run Flink on K8s.
  • Not very friendly for batch tasks, or for wanting to commit multiple tasks in a Session. Resources cannot be applied for or released in real time. Because TaskManager’s resources are fixed, batch tasks may run in multiple stages and need to apply for and release resources in real time, which cannot be implemented at present. If you need to run multiple jobs in a Session and end them one by one, you can’t do that either. If a large Session Cluster is maintained, resources may be wasted. However, if the Session Cluster is small, jobs may run slowly or fail to run.

Based on these points, we promoted a Native integration scheme in the community, which is similar to the Native integration of Yarn and enables Flink to sense the existence of the lower Cluster.

Technical details of Navtive Integration

Why is it called Native? Including the following meanings.

  • Resource application method: Flink’s Client has a built-in K8s Client, which can be used to create JobManager. After the Job is submitted, if there is a demand for resources, The JobManager applies for resources from Flink’s ResourceManager. In this case, Flink’s ResourceManager will directly communicate with K8s API Server and send these request resources directly to K8s Cluster to tell it how many TaskMangers are required and how big each TaskManager is. When the task is complete, it also tells the K8s Cluster to release unused resources. It means that Flink knows the existence of K8s Cluster in a very primitive way, and knows when to apply for resources and when to release resources.
  • Native is relative to Flink. With the command of Flink, it can achieve a state of autonomy. It is not necessary to introduce external tools to complete the task running on K8s through Flink.

How does it work? Session and Perjob will be introduced to you.

Native Kubernetes Session mode

Native Kubernetes Session mode

First of all, Session mode.

  • Phase 1: Start Session Cluster. The Flink Client has the K8s Client built in and tells the K8s Master to create Flink Master Deployment, ConfigMap, SVC. Once it’s created, the Master pulls up. At this point, the Session is deployed and no TaskManager is maintained.
  • The second stage: When a user submits a Job, he/she can submit it through Flink Client or Dashboard, and then send it to Dispatcher through Service. The Dispatcher will generate a JobMaster. JobMaster applies for resources from K8sResourceManager. ResourceManager finds that there are no available resources, and it continues to request resources from K8s’s Master. After requesting the resources, it sends them back to the Taskmanager. After the Taskmanager gets up and registers with the Taskmanager, ResourceManager applies for slots for JobMaster. JobMaster deploys tasks to Taskmanager. This completes the pull from the Session to the user’s submission.
  • Note that SVC is an External Service. You must ensure that the Client can access the Master through the Service. In many K8s clusters, K8s and Flink Client are not in the same network environment. In this case, LoadBalancer or NodePort can be used. Make the Flink Client accessible to the Jobmanager Dispatcher, otherwise Jar packages cannot be submitted.

Native Kubernetes Perjob mode

Native Kubernetes Perjob mode

Let’s take a look at the Perjob method. As shown in the figure, the Perjob method is similar to the previous one. The difference is that it does not need to start a Session Cluster first and then submit the task.

  • After the Service, Master, and ConfigMap resources are first created, Flink Master Deployment already has a user Jar in it, At this point entryPoint will extract or run the user’s main from the user Jar and generate JobGraph. The Dispatcher generates the Master, and then applies for resources from ResourceManager. The following logic is the same as Session.
  • The biggest difference between it and Session is that it is committed in one step. Because there is no two-step commit requirement, external services can be dispensed with if there is no need to access the external UI after the task is up. You can make the task run directly through a one-step commit. Flink’s Web UI can be accessed through a local port-forward or some proxy using K8s ApiServer. At this point, the External Service is no longer needed, meaning that it does not need to hold a LoadBalancer or a NodePort. That’s the Perjob method.

The difference between Session and Perjob modes

So what’s the difference between Session and Perjob?

The Demo presentation

Session

  1. To start the Session

Note: Image needs to be replaced with your own image or Flink official image library on Docker Hub.

./bin/kubernetes-session.sh \ -Dkubernetes.cluster-id=k8s-session-1 \ -Dkubernetes.container.image=<ImageName> \ -Dkubernetes.container.image.pull-policy=Always \ -Djobmanager.heap.size=4096m \ -Dtaskmanager.memory.process.size=4096m  \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2Copy the code
  1. Submit the job to Session
./bin/flink run -d -p 10 -e kubernetes-session -Dkubernetes.cluster-id=k8s-session-1 examples/streaming/WindowJoin.jar
Copy the code
  1. To stop the Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=k8s-session-1 -Dexecution.attached=true
Copy the code

Application

Application mode is a new mode introduced by the FLIP-85, intended in the long run to replace the community’s current PerJob mode. Currently, the biggest difference between Application mode and PerJob mode is whether the user code runs on the client side or on the JobManager side. On K8s deployments, it is easy to support application mode because the user’s jars and dependencies can be pre-typed into the image.

Note: Application mode is only supported in Flink 1.11 and requires the corresponding Flink client and mirror. You can consult community documentation to build your own images.

./bin/flink run-application -p 10 -t kubernetes-application \
-Dkubernetes.cluster-id=k8s-app1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
local:///opt/flink/examples/streaming/WindowJoin.jar
Copy the code

Current function status

  • Native Kubernetes Session mode

FLINK-9953: Has been released in FLINK 1.10:

Issues.apache.org/jira/browse…

  • Native Kubernetes Application mode

FLINK-10934: Scheduled for release in FLINK 1.11

Issues.apache.org/jira/browse…

  • High availability in Native mode

Flink-12884: The current high availability mode is implemented based on ZK. In the future, we hope to do Meta storage and Leader election based on K8s ConfigMap without relying on external components. There is currently an internal implementation that will be contributed to the community in the future.

Issues.apache.org/jira/browse…

  • Other features:

FLINK-14460: Planned for release and improvement in FLINK 1.11/1.12

Issues.apache.org/jira/browse…

The contents are as follows:

  • Label, annotation, Node-selector: A scenario where you want to schedule a task to a particular machine in a cluster might require a Node-selector, you want to assign a particular Label to the cluster and make it externally accessible, you can use the Label function, and so on.
  • Sidecar Container: helps collect logs
  • Init Container: It helps to download the jars before JobManager and TaskManager start. This way we can use the same image without typing the user jars into the image
  • Storage optimization
  • The Pod template performs some unusual functions, and so on

At present, the community’s Flink on K8s native solution is still developing and improving rapidly. I hope you can try it out and give feedback. If you are interested, you are welcome to participate in the development.