According to the article “unlock cloud native AI skills | on Kubernetes build machine learning system, set up a set of Kubeflow Pipelines, after we started together, with a real case, Learn how to develop machine learning workflows based on Kubeflow Pipelines.

The preparatory work

Machine learning workflow is a task-driven process as well as a data-driven process, which involves data import and preparation, model training Checkpoint export evaluation, and final model export. In this case, distributed storage is required as the transmission medium, and NAS is used as the distributed storage.

** Create distributed storage. The following uses the NAS as an example. ** Replace NFS_SERVER_IP with the real NAS server address.

  • \1. To create aliyun NAS service, please refer to the document [1]
  • \2. It needs to be created on the NFS Server/data
# mkdir -p /nfs
# mount -t NFS -o vers=4.0 NFS_SERVER_IP:/ / NFS
# mkdir -p /data
# cd /
# umount /nfs
Copy the code
  • \3. Create Persistent Volumes
# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: user-susan
  labels:
    user-susan: pipelines
spec:
  persistentVolumeReclaimPolicy: Retain
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteMany
  nfs:
    server: NFS_SERVER_IP
    path: "/data"
# kubectl create -f nfs-pv.yaml
Copy the code
  • \4. Create Persistent Volume Claim
# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: user-susan
  annotations:
    description: "this is the mnist demo"
    owner: Tom
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
       storage: 5Gi
  selector:
    matchLabels:
      user-susan: pipelines
# kubectl create -f nfs-pvc.yaml
Copy the code

The development of Pipeline

As the examples provided by Kubeflow Pipelines are all dependent on Google storage service, domestic users cannot really experience the capacity of Pipelines. • Kubeflow Pipelines • Kubeflow Pipelines • Kubeflow Pipelines • Kubeflow Pipelines • Kubeflow Pipelines

The specific steps are as follows:

  • (1) Download data
  • (2) Use TensorFlow for model training
  • (3) Model export

Of the above three steps, the latter step is dependent on the previous step to complete.

See Standalone_pipeline.py [2] for the complete code.

In our example we use arenA_op based on the open source project Arena[3], which is the default Container_op package for Kubeflow, which can seamlessly integrate distributed training MPI and PS modes. In addition, it also supports simple access to heterogeneous devices and distributed storage such as GPU and RDMA. Meanwhile, it is convenient to synchronize code from Git source, which is a relatively practical tool API.

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2') :"""A pipeline for end to end machine learning workflow."""
  data=["user-susan:/training"]
  gpus=1
# 1. prepare data
  prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \ cd /training/dataset/mnist && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
  # 2. downalod source code and train the models
  train = arena.standalone_job_op(
    name="train",
    image="Tensorflow/tensorflow: 1.11.0 gpu - py3." ",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command=' '' echo %s; python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \ --max_steps 500 --data_dir /training/dataset/mnist \ --log_dir /training/output/mnist --learning_rate %s \ --dropout %s'' ' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])
  # 3. export the model
  export_model = arena.standalone_job_op(
    name="export-model",
    image="Tensorflow/tensorflow: 1.11.0 - py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s; python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))
Copy the code

Kubeflow Pipelines convert the above code into a directed acyclic graph (DAG), where each node is Component and the lines between components represent the dependencies between them. See DAG figure in Charge UI:

  • First understand the data preparation part in detail.

Here we provide the Python API for arena.standalone_job_op. We need to specify the name of this step: name; Container images to use: image; The data to be used and its corresponding mount directory inside the container: data.

Data is an array format. For example, data=[” user-Susan :/training”] indicates that multiple data can be mounted. User-susan is the Persistent Volume Claim created earlier, and /training is the mount directory inside the container.

prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \ cd /training/dataset/mnist && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \ curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
Copy the code

“/training/dataset/mnist”, “training”, “dataset”, “mnist”, “mnist”, “mnist”, “mnist”, “mnist”, “mnist”, “mnist” /training/dataset/mnist is a subdirectory. In fact, the following steps can be performed by using the same root mount point to read the data.

  • The second step is to use the data downloaded to the distributed storage, and use Git to specify a fixed commit ID to download the code, and conduct model training.
train = arena.standalone_job_op(
    name="train",
    image="Tensorflow/tensorflow: 1.11.0 gpu - py3." ",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command=' '' echo %s; python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \ --max_steps 500 --data_dir /training/dataset/mnist \ --log_dir /training/output/mnist --learning_rate %s \ --dropout %s'' ' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])
Copy the code

It can be seen that this step is a bit more complicated than data preparation. Besides the name, image, data and command need to be specified as in the first step, in the model training step, it also needs to be specified:

  • The method of obtaining the code: from the point of view of reproducible experiments, it is very important to trace the source of the running experimental code. GIT_SYNC_REV specifies the commit ID of the training code.

  • Gpu: The default value is 0, indicating that the GPU is not used. If it is an integer value greater than 0, it means the number of Gpus required for this step.

  • Metrics: Also for reproducible and comparable experimental purposes, users can export a series of indicators needed, and display and compare them intuitively through charge UI. The specific use method is divided into two steps:

    \1. When the API is called, specify the metrics name and the display format of the metrics to be collected as an array PERCENTAGE or RAW, for example, metrics=[” train-accuracy :PERCENTAGE”].

    < span style = “box-sizing: border-box; color: RGB (74, 74, 74); line-height: 22px! Important; word-break: inherit! Important;”

It is worth noting:

In this step, specify the same data parameter [“user-susan:/training”] as prepare_data, and you can read the corresponding data in the training code, such as –data_dir /training/dataset/mnist. Also, because the step depends on prepare_data, you can indicate the dependency between the two steps in a method by specifying prepare_data.output.

  • The last export_model Based on the train Checkpoint generated by training, training model is generated:
export_model = arena.standalone_job_op(
    name="export-model",
    image="Tensorflow/tensorflow: 1.11.0 - py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s; python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))
Copy the code

Export_model is similar to, or even simpler than, trainin that it simply exports code from the git synchronized model and executes the model export using checkpoint in the shared directory /training/output/mnist.

The workflow looks pretty straightforward, so you can define a Python method to tie it all together:

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2') :Copy the code

@dsl.pipeline is a decorator that represents the workflow. In this decorator, you need to define two properties, name and Description. The entry method sample_pipeline defines four parameters: learning_rate, dropout, model_version, and COMMIT, which can be used in the train and export_model phases above, respectively. PipelineParam[5] : dsl.pipelineParam; PipelineParam[5] : dsl.pipelineParam; The default value is the parameter value. It is worth noting that the dsl.pipelineParam values can only be strings and numbers; Arrays and maps, as well as custom types, cannot be transformed by transformations.

In fact, these parameters can be overridden when the user submits the workflow. Here is the UI for submitting the workflow:

Submit the Pipeline

< span style = “box-sizing: border-box; line-height: 22px; word-break: inherit! Important; word-break: inherit! Important;”

KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
  import kfp.compiler as compiler
  compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
  client = kfp.Client(host=KFP_SERVICE)
  try:
    experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
  except:
    experiment_id = client.create_experiment(EXPERIMENT_NAME).id
  run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                            params={'learning_rate':learning_rate,
                                     'dropout':dropout,
                                    'model_version':model_version,
                                    'commit':commit})
Copy the code

Compile Python code into DAG configuration files recognized by the execution engine (Argo) using compiler.compile; Create or find existing experiments through the Kubeflow Pipeline client, and submit the previously compiled DAG configuration file.

> < span style = “box-sizing: border-box! Important;

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash
Copy the code

After logging into the Python3 environment, execute the following command to submit two consecutive tasks with different parameters:

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz -- the upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standal one_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3
Copy the code

Viewing the Running result

* * * * * * * * * * * * * * * * * * * *

https://11.124.285.171/pipeline/#/experiments
Copy the code

Click the Compare Runs button to Compare the inputs, time spent and accuracy of the two experiments, among other metrics. Making the experiments traceable is the first step to make the experiments reproducible, and utilizing the experimental management capacity of Kubeflow Itself is the first step to open up the experimental reproducibility.

conclusion

The steps required to implement a running Kubeflow Pipeline are:

* * * *

\1. Build the minimum unit of execution required in the Pipeline (Component). If you use the natively defined dsl.container_ops, you need to build two parts of the code:

  • Build runtime code: Typically build container mirrors for each step as an adapter between Pipelines and the code that actually executes the business logic. < span style = “box-sizing: border-box; line-height: 22px; display: block; word-break: inherit! Important; word-break: inherit! Important;” The result is that the runtime code and business logic code are coupled together. Examples of Kubeflow Pipelines can be referenced [6].

  • Build client code: This step usually looks like this. Those familiar with Kubernetes will notice that this step is actually writing a Pod Spec:

container_op = dsl.ContainerOp(
        name=name,
        image='<train-image>',
        arguments=[
            '--input_dir', input_dir,
            '--output_dir', output_dir,
            '--model_name', model_name,
            '--model_version', model_version,
            '--epochs', epochs
        ],
        file_outputs={'output': '/output.txt'} ) container_op.add_volume(k8s_client.V1Volume( host_path=k8s_client.V1HostPathVolumeSource( path=persistent_volume_path), name=persistent_volume_name)) container_op.add_volume_mount(k8s_client.V1VolumeMount( mount_path=persistent_volume_path,  name=persistent_volume_name))Copy the code

Container_ops The advantage of using the native dSL.container_ops is flexibility, because users can do a lot of things at the Container_ops level due to the open interface in charge. But here’s the problem:

  • Low reuse. Each Component needs to build images and develop runtime code;
  • High complexity. Users need to understand Kubernetes concepts, such as resource limit, PVC, node selector, etc.
  • Support distributed training difficulties. Due to thecontainer_opFor single-container operations, if you want to support distributed training, you need to submit and manage tfJob-like tasks in Container_OPS. This presents both complexity and security challenges. Complexity is easy to understand, but security means that submitting permissions for tasks like TFJob requires opening additional permissions to the Pipeline developer.

Another approach is to use a reusable Component API like ArenA_op, which uses generic runtime code and eliminates the need to repeatedly build runtime code. At the same time, a common set of ARENA_OP API is used to simplify the use of users. The Parameter Server and MPI scenarios are also supported. You are advised to compile Pipelines in this way.

\2. Assemble the constructed Component into Pipeline;

\3. Compile Pipeline into DAG configuration file identified by Argo execution engine (Argo), Kubeflow Pipelines; use Kubeflow Pipelines own UI to check the flow results.

A link to the

[1]

Yq.aliyun.com/go/articleR…

[2]

Yq.aliyun.com/go/articleR…

[3]

Yq.aliyun.com/go/articleR…

[4]

Yq.aliyun.com/go/articleR…

[5]

Yq.aliyun.com/go/articleR…

[6]

Yq.aliyun.com/go/articleR…