Flink 1.9.0 and later support Python, also known as PyFlink.

In the latest version of Flink 1.10, PyFlink supports Python user-defined functions, enabling you to register and use these functions in the Table API and SQL. But after hearing all this, you might still be wondering what PyFlink’s architecture really is. As a quick guide to PyFlink, this article answers these questions.

Why PyFlink?

Flink on Python and Python on Flink

So what exactly is PyFlink? As the name suggests, PyFlink is a combination of Apache Flink and Python, or Flink on Python. But what does Flink on Python mean? First, the combination of the two means that you can use all of Flink’s capabilities in Python. And, more importantly, PyFlink also allows you to leverage the computing capabilities of Python’s extensive ecosystem on Flink, further facilitating its ecosystem development. In other words, it’s a win-win for both sides. If you dig a little deeper into this topic, you’ll see that the integration of the Flink framework and the Python language is no coincidence.

Python and the Big data ecosystem

The Python language is closely associated with big data. To understand this, we can look at some of the real problems that people are solving using Python. A user survey shows that most people are using Python for data analysis and machine learning applications. There are also some ideal solutions for such situations in the big data space. In addition to expanding the audience for big data products, the integration of Python and big data greatly enhances the Python ecosystem by extending its independent architecture to a distributed one. This also explains the strong need for Python when analyzing large amounts of data.

Why Flink and Python?

The integration of Python and big data is consistent with other recent trends. But, again, why does Flink now support Python instead of Go or R or another language? Also, why do most users choose PyFlink over PySpark and PyHive?

To understand why, let’s first consider some advantages of using the Flink framework:

  • Favorable architecture: Flink is a pure stream computing engine with unified streaming and batch capabilities.
  • ** New vitality: ** Flink was the most active open source project of 2019, according to ASF objective statistics.
  • ** High reliability: ** As an open source project, Flink has been tested for a long time and is widely used in the production environment of big data companies.

Next, let’s look at why Flink supports Python and not some other language. Statistics show that Python is the most popular language after Java and C, and has been growing rapidly since 2018. Java and Scala are Flink’s default languages, but it seems reasonable that Flink supports Python.

PyFlink is an inevitable outgrowth of related technology. However, understanding the importance of PyFlink is not enough, because our ultimate goal is to benefit Flink and Python users and solve real problems. Therefore, we need to further explore how to implement PyFlink.

PyFlink architecture

To implement PyFlink, we need to know the key goals to achieve and the core problems to solve. What is PyFlink’s main goal? In short, PyFlink’s main goals are as follows:

  1. Make all Flink features available to Python users.
  2. Run Python’s analysis and computing capabilities on Flink to improve Python’s ability to solve big data problems.

On this basis, let’s analyze the key issues that need to be addressed to achieve these goals.

Make the Flink feature available to Python users

To implement PyFlink, do YOU need to develop a Python engine on Flink like an existing Java engine? The answer is NO. Try Flink 1.8 or earlier, but it doesn’t work well. The basic design principle is to achieve a given goal at minimal cost. The simplest but best approach is to provide a layer of Python APIS and reuse existing computing engines.

So, what Python apis should we provide for Flink? They were familiar with us: the advanced table API and SQL, and the stateful DataStream API. Now that we’re getting closer to Flink’s internal logic, the next step is to provide the Table and DataStream apis for Python. But what are the key issues left to solve?

The key problem

Obviously, the key issue is to establish a handshake between the Python Virtual Machine (PyVM) and the Java Virtual Machine (JVM), which is essential for Flink to support multiple languages. To solve this problem, we must choose the appropriate communication technology.

Select virtual machine communication technology

Currently, there are two solutions available for communication between PyVM and JVM: Beam and Py4J. The former is a well-known project with multi-language and multi-engine support, while the latter is a dedicated solution for communication between PyVM and JVM. We can compare and contrast Apache Beam and Py4J from several different perspectives to see the differences between them. First, consider an analogy: To get over a wall, Py4J will dig a hole in it like a mole, and Apache Beam will pull down the entire wall like a bear. From this perspective, using Apache Beam to implement VM communication is a bit complicated. In short, this is because Apache Beam focuses on generality and lacks flexibility in extreme cases.

In addition, Flink requires interactive programming. In addition, for Flink to work properly, we also need to ensure semantic consistency in its API design, especially in its multilingual support. Apache Beam’s existing architecture does not meet these requirements, so the obvious answer is that Py4J is the best choice to support communication between PyVM and JVM.

The technical architecture

After establishing communication between PyVM and JVM, we have achieved our first goal of providing Flink functionality to Python users. We’ve already done this with Flink 1.9. Now, let’s look at the architecture of the PyFlink API in Flink 1.9:

Flink version 1.9 uses Py4J for virtual machine communication. We have enabled the gateway for PyVM and the gateway server for the JVM to receive Python requests. In addition, we provide objects like TableENV and Table from the Python API, which are the same as those provided in the Java API. Thus, writing Python apis is essentially about how to call Java apis. Flink version 1.9 also addresses job deployment issues. It allows you to submit jobs in a variety of ways, such as running Python commands and using the Python Shell and CLI.

But what advantages does this architecture offer? First, the architecture is simple and ensures semantic consistency between Python apis and Java apis. Second, it also provides excellent Python job processing performance comparable to Java jobs.

Run Python’s analysis and computation capabilities on Flink

The previous section described how to make the Flink feature available to Python users. This section explains how to run Python functions on Flink. In general, we can run Python functions on Flink in one of two ways:

  1. ** Select a typical Python class library and add its API to PyFlink. ** This method takes a long time because Python contains too many class libraries. Before merging any apis, we need to simplify Python execution.
  2. ** Based on the characteristics of the existing Flink Table API and Python class library, we can treat all existing Python class library functions as user-defined functions and integrate them into Flink. ** This feature is supported in Flink 1.10 and later. What are the key issues for functional integration? Again, it depends on the execution of Python user-defined functions.

Next, let’s choose a technique for this critical problem.

Select the technology that performs the user-defined function

In fact, executing Python user-defined functions is very complicated. It involves not only communication between virtual machines, but all of the following: managing the Python execution environment, parsing business data exchanged between Java and Python, passing state backends in Flink to Python, and monitoring execution state. Given all this complexity, it’s time Apache Beam came into play. As a bear that supports multiple engines and languages, Apache Beam can do a lot to address this situation, so let’s take a look at how Apache Beam handles executing Python user-defined functions.

The portability framework, Apache Beam’s highly abstract architecture designed to support multiple languages and engines, is shown below. Currently, Apache Beam supports several different languages, including Java, Go, and Python.

User-defined functional architecture

The UDF architecture not only needs to implement communication between PyVM and JVM, but also needs to meet different requirements at compile and run times. In the PyLink user-defined functional architecture diagram below, the behavior in the JVM is shown in green and the behavior in PyVM is shown in blue. Let’s take a look at partial design at compile time. Native design relies on pure API mapping calls. Py4J is used for VM communication.

Now, let’s look at how the Python API and Java API work in this architecture. On the Java side, JobMaster assigns jobs to The TaskManager as if they were normal Java jobs, and the TaskManager performs the tasks, which involve operator execution in the JVM and PyVM. In Python user-defined function operators, we will design various gRPC services for communication between the JVM and PyVM. For example, the DataService for business data communication and the StateService for Python UDFs call the Java State back end. Many other services, such as logging and metrics, will also be provided.

How do we use PyFlink?

Now that you know PyFlink’s architecture and the ideas behind it, let’s take a look at PyFlink’s specific application scenarios to better understand how and why.

Application scenarios of PyFlink

What business solutions does PyFlink support? We can analyze its application scenarios from two perspectives: Python and Java. Keep in mind that PyFlink is also suitable for all situations where Java can be used.

  1. ** Event-driven solutions, ** such as real-time data monitoring.
  2. ** Data analysis, ** such as inventory management and data visualization.
  3. ** Data pipeline, ** also known as ETL schemes, such as log parsing.
  4. ** Machine learning, ** e.g. targeted advice.

You can use PyFlink in all of these situations. PyFlink is also suitable for Python-specific scenarios, such as scientific computing. With so many application scenarios, you might be wondering which specific PyFlink apis are available now. So now we’re going to look at that as well.

PyFlink installation

Before using any of the apis, you need to install PyFlink. Currently, to install PyFlink, run the command PIP install apache-flink

PyFlink API

The PyFlink API is fully aligned with the Java Table API to support various relational and windowing operations. Some easy-to-use PyFlink apis are more powerful than SQL apis, such as the column-specific API. In addition to the API, PyFlink provides a variety of methods for defining Python UDFs.

User-defined function definitions in PyFlink

ScalarFunction can be extended (for example, by adding metrics) to provide more accessibility. In addition, PyFlink user function functions support all of the method definitions supported by Python, such as lambda, named functions, and callable functions.

Once these methods are defined, we can use PyFlink Decorators to mark them up and describe the input and output data types. We can also further simplify later versions of Python for type derivation based on its type hinting feature. The following example will help you better understand how to define user-defined functions.

A case for defining Python user-defined functions

In this case, we add the two numbers. First, import the necessary classes to do this, and then define the previously mentioned functions. This is pretty simple, so let’s do a real case.

What are PyFlink’s future prospects?

In general, business development with PyFlink is simple. You can easily describe the business logic through the SQL or Table APIS without understanding the underlying implementation. Let’s take a look at PyFlink’s overall outlook.

Goal driven roadmap

The development of PyFlink has always been driven by the goals of making Flink functionality available to Python users and integrating Python functions into Flink. Following the PyFlink roadmap shown below, we first established communication between PyVM and JVM. Then, in Flink 1.9, we provided the Python Table API, opening up the existing Flink Table API functionality to Python users. In Flink 1.10, we are going to integrate Python functions into Flink by: Integrate Apache Beam, set up Python user-defined function execution environments, manage Python’s dependencies on other class libraries, and define user-defined function apis for users to support Python user-defined functions.

To extend the functionality of distributed Python, PyFlink provides support for Pandas Series and DataFrame so that users can use functions defined by Pandas users directly in PyFlink. In addition, Python user-defined functions will be enabled on SQL clients in the future to make PyFlink easy to use. PyFlink will also provide a Python ML pipeline API to enable Python users to use PyFlink in machine learning. Monitoring the execution of Python user-defined functions is critical to actual production and business. PyFlink will therefore further provide metric management for Python user-defined functions. These features will be included in Flink 1.11.

However, these are only part of PyFlink’s plans for the future. There is more work to be done, such as optimizing PyFlink’s performance, providing graphical computing apis, and supporting Pandas’ native apis for Pandas on Flink. We will continue to provide Python users with the existing capabilities of Flink and integrate the power of Python into Flink to achieve our original goal of extending the Python ecosystem.

What are the prospects for PyFlink? As you probably know, PyFlink is part of Apache Flink, which covers both the runtime and API layers.

How will PyFlink evolve in these two layers? On the run time side, PyFlink builds gRPC general services (such as controls, data, and state) for communication between the JVM and PyVM. In this framework, Java Python user-defined function operators are abstracted and Python execution containers are built to support multiple implementations of Python. For example, PyFlink can run as a process in a Docker container or even in an external service cluster. In particular, unlimited extension is enabled in the form of sockets when running in an external cluster of services. All of this plays a crucial role in the subsequent Python integration.

On the API side, we will enable python-based apis in Flink to fulfill our mission. This also relies on the Py4J VM communication framework. PyFlink will gradually support more apis, including the Java API in Flink (e.g. Python Table API, UDX, ML Pipeline, DataStream, CEP, Gelly and State API) and the most popular among Python users, the Pandas API. Based on these apis, PyFlink will continue to integrate with other ecosystems for easy development; Examples include Notebook, Zeppelin, Jupyter and Alink, alibaba’s open source version of Flink. So far, PyAlink has fully integrated PyFlink. PyFlink will also integrate with existing AI systems platforms, such as the well-known TensorFlow.

To this end, PyFlink will always be alive. Similarly, PyFlink’s job is to make Flink functionality available to Python users and to run Python analysis and computation capabilities on Flink.

For more real-time data analysis blog posts and scientific news, please follow “Real-time streaming Computing” follow “Real-time streaming Computing” reply to “Ebook” for Flink’s 300-page practical ebook