In the field of distributed big data computing engine, The most commonly used Apache Spark has long supported Python programming language, and has API support for ML (machine learning) and DM (data mining), while Flink, as the third generation of computing engine, Support for Python (PyFlink) has been added since version 1.9.0. In Flink 1.10, PyFlink added support for Python UDFs (custom functions), which can be registered and used in Table API/SQL, Starting with Flink 1.11, PyFlink jobs can also be run natively on Windows, so you can develop and debug PyFlink jobs on Windows
The installation of Pyflink
Pyflink is deceptively simple to install
- Start by looking at the Python version of your system (PyFlink requires Python (3.6, 3.7, or 3.8))
$python3 --version # The version printed here must be 3.6, 3.7 or 3.8Copy the code
- Environment configuration Since the system may contain multiple Versions of Python, it also contains multiple Python binary executables. Run the following
ls
Command to find out which Python binary executables are available on the system:
$ ls /usr/bin/python*
Copy the code
- Select soft links
python
Point to yourpython3
The interpreter
ln -s /usr/bin/python3 python
Copy the code
- Installing Pyflink Since Pyflink is still in the middle of a hot update that changes a lot from one version to the next, just install the latest version (apache-Flink 1.13.2 at the time of updating this article)
$ python3 -m pip install apache-flink
Copy the code
Windows development environment configuration
Here we chose Pycharm IDE for Windows Pyflink development
- First configure the Pytthon virtual environment configuration path
PyCharm -> Preferences -> Project Interpreter
Remember to select python version 3.6, 3.7, or 3.8
- Create a new project and select the Python virtual environment we just configured
-
Install Pyflink to enter the terminal interfaceTake a look at the Python version first
Then install PyflinkWhen it’s finally done, you can do it in
site-packages
Find belowpyflink
Directory, as follows -
Hello World example
Create a new.py file and enter the following code
#! /usr/bin/env python38
#-*- coding:utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
def hello_world() :
""" Reads the data from a random Source and prints it directly using PrintSink. "" "
settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
source_ddl = """ CREATE TABLE random_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ) """
sink_ddl = """ CREATE TABLE print_sink ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'print' ) """
# register source and sink
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
# Data extraction
tab = t_env.from_path("random_source")
We'll use the deprecated API for now, because the new asynchronous commit tests need to be improved...
tab.execute_insert("print_sink").wait()
# execute job
t_env.execute_sql("Flink Hello World")
if __name__ == '__main__':
hello_world()
Copy the code
The results are as follows
Welcome to exchange and learn
Personal blog
CSDN home page