PyFlink is the entry point of The Python language of Flink. Its Python language is indeed easy to learn, but the development environment of PyFlink is not easy to build. If it is careless, the environment of PyFlink will be messed up, and it is difficult to find the cause. Zeppelin Notebook is a PyFlink development environment that can help you solve these problems. The main contents are:
- The preparatory work
- Build the PyFlink environment
- Summary and Future
You’ve probably heard of Zeppelin before, but previous articles have focused on how to develop Flink SQL in Zeppelin. Today’s article will introduce how to develop PyFlink Jobs efficiently in Zeppelin, especially to solve the environmental problems of PyFlink.
The Zeppelin Notebook uses Conda to create Python env automatically deployed to Yarn clusters. You don’t need to manually install any PyFlink packages on the cluster. And you can use multiple isolated versions of PyFlink in a Yarn cluster at the same time. This is what you end up seeing:
1. The ability to use third-party Python libraries such as Matplotlib on PyFlink clients:
2. You can use third-party Python libraries in PyFlink UDF, such as:
Let’s see how to do that.
First, preparation
Step 1.
Get ready to build the latest version of Zeppelin, this will not be started here, if you have questions you can join the Flink on Zeppelin peggroup (34517043). It is also important to note that the Zeppelin deployment cluster needs to be Linux. If the cluster is Mac, Conda on the Mac machine cannot be used in the Yarn cluster (because Conda packages are incompatible between systems).
Step 2.
To download Flink 1.13, it is important to note that the features of this article can only be used in Flink 1.13 and above. Then:
- Flink -Python-*. Jar jar package flink-Python-*. Jar jar package flink lib folder
- Copy the opt/Python folder into Flink’s lib folder.
Step 3.
Install the following software (which is used to create the Conda ENV) :
-
Miniconda: * * * * docs. Conda. IO/en/latest/m…
-
Conda pack: * * * * conda in making. IO/conda – pack /
-
Mamba: * * * * github.com/mamba-org/m…
Second, build PyFlink environment
Now you can build in Zeppelin and use PyFlink.
Step 1. Create PyFlink Conda environment on JobManager
Because Zeppelin has Shell support by nature, it is possible to create PyFlink environments using shells in Zeppelin. Note that Python third-party packages are required on the PyFlink client (JobManager), such as Matplotlib, and ensure that at least the following packages are installed:
-
A version of Python (3.7 is used here)
-
Apache-flink (1.13.1 used here)
-
Jupyter, Grpcio, Protobuf (these three packs are needed by Zeppelin)
The remaining packages can be specified as needed:
%sh # make sure you have conda and momba installed. # install miniconda: https://docs.conda.io/en/latest/miniconda.html # install mamba: https://github.com/mamba-org/mamba echo "name: Pyflink_env Channels: -conda-forge - defaults Dependencies: -python = 3.7-pip -apache-flink ==1.13.1 -jupyter-grppcio-protobuf-matplotlib-pandasql -pandas -scipy -seaborn-plotnine "> pyflink_env.yml mamba env remove -n pyflink_env mamba env create -f pyflink_env.ymlCopy the code
Run the following code to package PyFlink’s Conda environment and upload it to HDFS (note that the package is in tar.gz format) :
%sh
rm -rf pyflink_env.tar.gz
conda pack --ignore-missing-files -n pyflink_env -o pyflink_env.tar.gz
hadoop fs -rmr /tmp/pyflink_env.tar.gz
hadoop fs -put pyflink_env.tar.gz /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_env.tar.gz
Copy the code
Create the PyFlink Conda environment on TaskManager
Run the following code to create a PyFlink Conda environment on TaskManager that contains at least two of the following packages:
- A version of Python (3.7 is used here)
- Apache-flink (1.13.1 used here)
The remaining packages are the packages that Python UDFs rely on, such as the specification pandas:
Echo "name: pyflink_tm_env Channels: -conda-forge - defaults Dependencies: -python =3.7 - PIP - PIP -apache-flink == 1.13.1-pandas "> pyFlink_tm_env. Yml mamba env remove -n pyFlink_tm_env mamba env create -f pyflink_tm_env.ymlCopy the code
Run the following code to package PyFlink’s Conda environment and upload it to HDFS (note the ZIP format used here)
%sh
rm -rf pyflink_tm_env.zip
conda pack --ignore-missing-files --zip-symlinks -n pyflink_tm_env -o pyflink_tm_env.zip
hadoop fs -rmr /tmp/pyflink_tm_env.zip
hadoop fs -put pyflink_tm_env.zip /tmp
# The Python conda tar should be public accessible, so need to change permission here.
hadoop fs -chmod 644 /tmp/pyflink_tm_env.zip
Copy the code
Step 3. Use Conda in PyFlink
Now you can use the Conda environment created above in Zeppelin. First, you need to configure Flink in Zeppelin.
-
Execution. Mode is yarn-application. The method described in this article applies only to yarn-application mode.
-
Specified yarn. The ship – archives, zeppelin. Pyflink. Python and zeppelin. Interpreter. Conda. Env. The name to configure the JobManager side pyflink conda environment;
-
Executable to specify the TaskManager PyFlink Conda environment.
-
Specify other optional Flink configurations, such as flink.jm.memory and flink.tm.memory here.
%flink.conf flink.execution.mode yarn-application yarn.ship-archives /mnt/disk1/jzhang/zeppelin/pyflink_env.tar.gz zeppelin.pyflink.Python pyflink_env.tar.gz/bin/Python zeppelin.interpreter.conda.env.name pyflink_env.tar.gz Python. Archives HDFS :/// TMP/pyFlink_tm_env.zip Python. Executable PyFlink_tm_env.zip /bin/Python3.7 flink.jm.memory 2048 flink.tm.memory 2048Copy the code
You can then use PyFlink and the specified Conda environment in Zeppelin as stated at the beginning. There are two scenarios:
-
In the following example, the JobManager Conda environment created above can be used on the PyFlink client (JobManager side), such as Matplotlib.
-
The following example uses libraries in the TaskManager Conda environment created above in the PyFlink UDF, as in the following example using Pandas in the UDF.
Third, summary and future
The Zeppelin Notebook uses Conda to create a Python env that can be automatically deployed to a Yarn cluster without manually installing any Pyflink packages on the cluster. You can also use multiple versions of PyFlink in a Yarn cluster.
Each PyFlink environment is isolated and the Conda environment can be customized at any time. You can download the following note and import it to Zeppelin to reproduce today’s lecture: http://23.254.161.240/#/notebook/2G8N1WTTS
There are still many things that can be improved:
- Currently we need to create two Conda envs because Zeppelin supports tar.gz and Flink only supports ZIP. Create a conda env when both sides are unified.
- Apache-flink now contains the Jar package of Flink. As a result, the conda env generated by apache-Flink is very large, and the initialization of yarn Container takes a long time. This requires the Flink community to provide a lightweight Python package (without the Flink JAR package) that can greatly reduce the conda Env’s size.