In the field of distributed big data computing engine, The most commonly used Apache Spark has long supported Python programming language, and has API support for ML (machine learning) and DM (data mining), while Flink, as the third generation of computing engine, Support for Python (PyFlink) has been added since version 1.9.0. In Flink 1.10, PyFlink added support for Python UDFs (custom functions), which can be registered and used in Table API/SQL, Starting with Flink 1.11, PyFlink jobs can also be run natively on Windows, so you can develop and debug PyFlink jobs on Windows

The installation of Pyflink

Pyflink is deceptively simple to install

  • Start by looking at the Python version of your system (PyFlink requires Python (3.6, 3.7, or 3.8))
$python3 --version # The version printed here must be 3.6, 3.7 or 3.8Copy the code
  • Environment configuration Since the system may contain multiple Versions of Python, it also contains multiple Python binary executables. Run the followinglsCommand to find out which Python binary executables are available on the system:
$ ls /usr/bin/python*
Copy the code
  • Select soft linkspythonPoint to yourpython3The interpreter
ln -s /usr/bin/python3 python
Copy the code
  • Installing Pyflink Since Pyflink is still in the middle of a hot update that changes a lot from one version to the next, just install the latest version (apache-Flink 1.13.2 at the time of updating this article)
$ python3 -m pip install apache-flink
Copy the code

Windows development environment configuration

Here we chose Pycharm IDE for Windows Pyflink development

  • First configure the Pytthon virtual environment configuration pathPyCharm -> Preferences -> Project Interpreter

Remember to select python version 3.6, 3.7, or 3.8

  • Create a new project and select the Python virtual environment we just configured

  • Install Pyflink to enter the terminal interfaceTake a look at the Python version first

    Then install PyflinkWhen it’s finally done, you can do it insite-packagesFind belowpyflinkDirectory, as follows

  • Hello World example

Create a new.py file and enter the following code

#! /usr/bin/env python38
#-*- coding:utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

def hello_world() :
    """ Reads the data from a random Source and prints it directly using PrintSink. "" "
    settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)
    source_ddl = """ CREATE TABLE random_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ) """

    sink_ddl = """ CREATE TABLE print_sink ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'print' ) """

    # register source and sink
    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    # Data extraction
    tab = t_env.from_path("random_source")
    We'll use the deprecated API for now, because the new asynchronous commit tests need to be improved...
    tab.execute_insert("print_sink").wait()
    # execute job
    t_env.execute_sql("Flink Hello World")



if __name__ == '__main__':
    hello_world()
Copy the code

The results are as follows

Welcome to exchange and learn

Personal blog

CSDN home page