1 background
When we use Flink to develop real-time tasks, we use the DataStream API provided by the framework itself, which makes it impossible for users to write business logic without Java or Scala or even Python. Although this approach is flexible and expressive, there are some development barriers for users, and the DataStream API has many incompatibilities with older versions as versions are updated. Therefore, Flink SQL has become the best choice for the majority of development users. The reason why Flink launched SQL API is mainly because SQL has the following important features:
- Declarative apis: users only care about what to do, not how to do it;
- Automatic optimization: shielding the complexity of the underlying API, automatic optimization;
- Simple and easy to understand: SQL is applied to different industries and fields, low learning cost;
- Unchangeable: The syntax follows the SQL standard specification and is not changeable.
- Stream batch unification: The same SQL code can be executed in both stream and batch mode.
Although Flink provides SQL capabilities, it is still necessary to build their own platform based on Flink SQL. At present, there are several ways to build SQL platform:
- Flink native API: use THE SQL API provided by Flink to encapsulate a general pipeline JAR, and use Flink shell scripting tool to submit SQL tasks;
- Apache Zeppelin is an open source product that uses notebook to manage SQL tasks. It is now integrated with Flink and provides a rich SDK.
- Flink Sql Gateway: an official Flink Sql Gateway that uses Rest to execute Flink Sql.
The first approach lacks flexibility and has performance bottlenecks when a large number of tasks are submitted. Zeppelin is powerful, but pages have limited functionality. If you want to build a SQL platform based on Zeppelin, you have to either use an SDK or heavily re-develop Zeppelin. Therefore, Flink Sql Gateway is more suitable for platform construction, because it is an independent Gateway service, convenient to integrate with the company’s existing system, completely decoupled from other systems, this paper also mainly describes the practice and exploration of Flink Sql Gateway.
2 Flink Sql Gateway Introduction
2.1 architecture
As shown in the figure above, the architecture of Flink SqlGateway is relatively simple. The main component is SqlGatewayEndpoint, which is a Netty service implemented based on Flink’s RestServerEndpoint. Through the custom implementation of a variety of handlers to complete the creation and deployment of SQL tasks, and the ability to manage. SqlGatewayEndpoint consists of SessionManager (session Management). SessionManager maintains a Session map, and session internal is mainly some context configuration and environment information.
-
SqlGatewayEndpoint: Netty service implemented based on RestServerEndpoint and provides Rest Api externally.
-
SessionManager: session manager, managing session creation and deletion;
-
Session: A Session that stores Flink configuration and context information required by a task and is responsible for the execution of the task.
-
Classpath: Flink Sql Gateway loads the Classpath of the Flink installation directory on startup, so Flink Sql Gateway basically has no dependencies other than Flink.
2.2 Execution Process
The SQL Gateway is really just a normal NIO server, and each Handler holds a reference to the SessionManager and can therefore access the same SessionManager object together. When a request arrives, the Handler obtains the parameters in the request, such as the SessionId, and queries the corresponding Session in the SessionManager to submit SQL and query the task status. The request process is as follows:
Create a session. This is the first step in using the SQL Gateway. SessionManager encapsulates the parameters of the task execution mode, configuration, and mode of the Planner as session objects, puts them into the Map, and returns the sessionID to the user.
The user has a sessionID and sends an SQL Request. The Gateway finds the corresponding Session object based on the sessionID and starts to deploy the SQL Job to YARN/Kubernetes.
2.3 features
2.3.1 Task Deployment
Flink Sql Gateway as the client of Flink, task deployment directly utilizes Flink’s capabilities, and Flink currently supports three deployment modes:
-
In the Application Mode,
-
In a per-job Mode,
-
In the Session Mode.
There are two differences among the three modes:
- Cluster life cycle and resource isolation: In per-job mode, the cluster life cycle is the same as that of job, but resource isolation is guaranteed.
- Whether the application’s main() method is performed on the client or on the cluster: Session mode and per-job mode are performed on the client, while Application Mode is performed on the cluster.
As you can see, Application Mode creates a session cluster for each Application and executes the Application’s main() method on the cluster, so it’s a compromise between session Mode and per-job.
So far, Flink only supports application Mode for JAR tasks, so if you want to implement application Mode for SQL tasks, you need to modify the implementation, and the implementation method will be explained later.
2.3.2 SQL capabilities
Flink Sql Gateway supports the following Sql syntax:
Flink Sql Gateway supports all Flink Sql syntax, but has some limitations of its own:
-
Do not support multiple SQL execution, multiple INSERT into execution will produce multiple tasks;
-
Incomplete set support, buggy set syntax support
-
Sql Hit support is not very friendly and is error-prone when written in Sql.
3. Platform transformation
3.1 Application Mode implementation of SQL
Flink does not support application Mode deployment for SQL tasks, only JAR tasks. The application Mode implementation of jar package task is as follows:
-
Flink-clients parses user configuration and JAR package information;
-
ApplicationConfiguration specifies the entry class name and input parameter for the main method.
-
ApplicationDeployer is responsible for starting Jobmanager and executes the Main method of Flink Application when it starts.
As can be seen from the above process, to achieve the APPLICATION mode of SQL, the implementation of universal SQL execution pipeline JAR is the key:
Implement a general pipeline JAR package that executes SQL, and preupload to YARN or K8S, as shown below:
Specified in ApplicationConfiguration
Pepeline jar main method entry and parameters:
3.2 Multiple Yarn Clusters
Currently, Flink supports only task deployment in one Yarn environment. If multiple Yarn environments are deployed, you need to deploy multiple Flink environments. Each Flink corresponds to one Yarn environment. While this approach solves the problem, it is not the optimal solution. If you are familiar with Flink, you will know that Flink uses the SPI of ClusterClientFactory to generate ClusterDescriptor with external resource system (Yarn/ Kubernetes). A ClusterDescriptor can be used to interact with resource systems. For example, a YarnClusterDescriptor holds the YarnClient object and can interact with Yarn. For multiple Yarn environments, ensure that the YarnClient object held in the YarnClusterDescriptor corresponds to the Yarn environment one by one, as shown in the following figure:
Author’s brief introduction
Zheng OPPO Senior Data Platform engineer
I am mainly responsible for the development of real-time computing platform based on Flink. I have rich experience in THE research and development of Flink and have participated in the contribution of Flink community.
For more exciting content, please scan code to pay attention to [OPPO number intelligence technology] public number