This article is compiled from the talk of Flink PMC Member Yunxie at Apache Flink Meetup 2020 · Shanghai station, The purpose is to help users quickly understand the optimization of Table & SQL in Connectivity and Simplicity and the best practices of actual development and use. It is mainly divided into the following four parts:
- This paper briefly reviews the development trend of Flink 1.8 to Flink 1.11 in Apache community, in which the active participation of domestic developers and the vigorous development of The Chinese community have made an important contribution to Flink’s activity in the community and GitHub.
- Read the new Flink SQL 1.11 features in detail,E.g.Simplified Connectors parameter + dynamic Table parameter to reduce code redundancy, built-in Connectors + LIKE syntax to facilitate quick testing, reconstructed TableEnvironment, TableSource/TableSink interface to improve ease of use, Hive Dialect + CDC further supports stream and batch integration.
- Highlight the new Hive data warehouse real-time support and Flink SQL introduced CDC data synchronization best practices.
- A brief look at the future plans for Flink SQL 1.12.
Author: Wu Chong (Yun Xie), Apache Flink PMC Member, Alibaba technology expert
Organizer: Chen Jingmin (Qingyue)
Proofreading: Min Ge
1 Flink 1.8 ~ 1.11 community development trend review
Since Alibaba announced to contribute Blink source code to Flink community in early 2019 and released Flink version 1.8 in April of the same year, Flink’s activity in the community has risen like a small rocket. The number of Commits per release has increased 50%, attracting a significant number of national developers and users who are engaged in the community. In June, The Number of Posts in the Chinese mailing list surpassed the Number of Posts in the English mailing list for the first time. In July the proportion exceeded 50%. Compared to other Apache open source communities such as Spark, Kafka’s mailing list numbers (around 200 per month) show that the Flink community as a whole is still very healthy and active.
2 Flink SQL new function interpretation
With an overview of Flink trends, let’s take a look at some of the new features in Connectivity and Simplicity that Flink’s recently released 1.11 release brings.
Flip-122: Simplified connector parameters
The whole Flink SQL 1.11 has a number of optimizations around ease of use, such as flip-122, which optimizes the verbose property parameter names of connectors. In the case of Kafka, before version 1.11, users’ DDLS needed to be declared as follows
CREATE TABLE user_behavior (
...
) WITH (
'connector.type'='kafka'.'connector.version'='universal'.'connector.topic'='user_behavior'.'connector.startup-mode'='earliest-offset'.'connector.properties.zookeeper.connect'='localhost:2181'.'connector.properties.bootstrap.servers'='localhost:9092'.'format.type'='json'
);
Copy the code
In Flink SQL 1.11, this simplifies to
CREATE TABLE user_behavior (
...
) WITH (
'connector'='kafka'.'topic'='user_behavior'.'scan.startup.mode'='earliest-offset'.'properties.zookeeper.connect'='localhost:2181'.'properties.bootstrap.servers'='localhost:9092'.'format'='json'
);
Copy the code
The DDL is no less informative, but it looks much cleaner 🙂 the Flink developers have discussed this optimization a lot, and you can check out the FLIP 122 Discussion Thread.
Flink-16743: Built-in Connectors
Flink SQL 1.11 adds three new built-in Connectors, as shown in the table below
connector | describe | Usage scenarios |
---|---|---|
‘connector’=’datagen’ | A source used to generate random data | Often used to test. |
‘connector’=’blackhole’ | Do not do anything with sink | Often used for performance testing |
‘connector’=’print’ | Sink for printing to the standard output stream (.out file) | Often used for debugging |
When the external connector environment is not ready, users can choose datagen Source and Print Sink to quickly build pipelines and become familiar with Flink SQL. For users who want to test Flink SQL performance, blackhole can be used as sink; For debug debug scenarios, Print Sink prints the results to standard output (such as taskManager.out file in clustered environments), which greatly reduces the cost of locating problems.
Flip-110: LIKE syntax
Flink SQL 1.11 allows users to quickly “fork” their own version of a table from a defined DDL and further modify attributes such as watermark or Connector. For example, if you wanted to add a watermark to your base_table, before Flink 1.11, you had to redeclare the table and add your own changes.
-- before Flink SQL 1.11
CREATE TABLE base_table (
id BIGINT,
name STRING,
ts TIMESTAMP
) WITH (
'connector.type'='kafka',
...
);
CREATE TABLE derived_table (
id BIGINT,
name STRING,
ts TIMESTAMP,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector.type'='kafka',
...
);
Copy the code
Starting with Flink 1.11, users can do this using only the CREATE TABLE LIKE syntax
- Flink SQL 1.11
CREATE TABLE base_table (
id BIGINT,
name STRING,
ts TIMESTAMP
) WITH (
'connector'='kafka',
...
);
CREATE TABLE derived_table (
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) LIKE base_table;
Copy the code
When the built-in connector is used with CREATE TABLE LIKE syntax, the effect of “sky thunder and earth fire” will be generated as shown in the following figure, which greatly improves the development efficiency.
Flip-113: dynamic Table parameter
For message queues like Kafka, there is usually a start point that specifies when the data will be consumed when the DDL is declared. If you need to change the start point, you need to re-declare the DDL for the new point, which is very inconvenient on older versions.
CREATE TABLE user_behavior (
user_id BIGINT,
behavior STRING,
ts TIMESTAMP(3))WITH (
'connector'='kafka'.'topic'='user_behavior'.'scan.startup.mode'='timestamp'.'scan.startup.timestamp-millis'='123456'.'properties.bootstrap.servers'='localhost:9092'.'format'='json'
);
Copy the code
Starting with Flink 1.11, users can set SQL dynamic parameters to be turned on in the SQL Client as follows (the default is turned off), so that specific start points can be specified in the DML.
SET 'table.dynamic-table-options.enabled' = 'true';
SELECT user_id, COUNT(DISTINCT behaviro)
FROM user_behavior /*+ OPTIONS('scan.startup.timestamp-millis'='1596282223') */
GROUP BY user_id;
Copy the code
In addition to the startup point, dynamic parameters also support more runtime parameters such as sink.partition and scans.startup. mode. For more information, please refer to flip-113.
Flip-84: Refactoring to optimize the TableEnvironment interface
Flink SQL 1.11 before the TableEnvironment interface definition and behavior is a little unclear, for example
TableEnvironment#sqlUpdate()
Method is executed immediately for DDL, but forINSERT INTO
DML statements are buffered until calledTableEnvironment#execute()
So statements that appear to be executed sequentially to the user may actually have a different effect.- There are two entrances to triggering job submission, one is
TableEnvironment#execute()
And the other one isStreamExecutionEnvironment#execute()
It is difficult for users to understand which method should be used to trigger job submission. - Do not accept multiple executions at a time
INSERT INTO
Statements.
To address these issues, Flink SQL 1.11 provides a new API, TableEnvironment#executeSql(), that unifies the behavior of executing SQL, whether receiving DDL, query, or INSERT INTO, immediately. StatementSet and TableEnvironment#createStatementSet() methods are provided for multi-sink scenarios, allowing users to add multiple INSERT statements to execute together.
In addition, the new execute method has a return value, the user can execute print, collect and other methods on the return value.
A comparison of the old and new apis is shown in the following table
Current Interface | New Interface |
---|---|
TEnv. SqlUpdate (" the CREATE TABLE..." ); |
TableResult result = tenv. executeSql("CREATE TABLE..." ); |
tEnv.sqlUpdate("INSERT INTO... SELECT..." ); tEnv.execute(); |
TableResult result = tEnv.executeSql("INSERT INTO ... SELECT..." ); |
TEnv. SqlUpdate (" insert into xx..." ); TEnv. SqlUpdate (" insert into yy..." ); tEnv.execute(); |
StatementSet ss =tEnv.createStatementSet(); Ss. AddInsertSql (" insert into xx..." ); Ss. AddInsertSql (" insert into yy..." ); TableResult result = ss.execute(); |
For some common problems encountered in using the new interface on Flink 1.11, Yunxie has provided unified answers, which can be viewed in the Appendix section.
Flip-95: TableSource & TableSink Reconstruction
In Flink SQL 1.11, the developers have spent a lot of time refactoring TableSource and TableSink APIS. The core optimizations are as follows
-
Remove type-specific interfaces, simplify development, solve confusing type problems, support full type
-
Clearer error message when looking for Factory
-
Failed to find primary key
-
Unified stream batch source and stream batch sink
-
Supports reading CDC and output CDC
-
Directly and efficiently generate Flink SQL internal data structure RowData
The old TableSink API is shown below, with six methods that are type-dependent and riddled with deprecated methods, causing connector bugs. The new DynamicTableSink API removes all type dependent interfaces because all types are derived from DDL and TableSink does not need to tell the framework what type it is. For users, the most intuitive experience is that the probability of encountering various odd errors on the old version is much reduced, such as the unsupported precision type and the error of finding the primary key/table factory is no longer in the new version. Details of how Flink 1.11 addresses these issues can be read in the Appendix section.
FLIP – 123: Hive the Dialect
Flink version 1.10 has Hive Connector support available for production, but older versions of Flink SQL do not support Hive DDL and use Hive syntax, which limits Flink Connectivity. In the new version, the developers introduced a new Parser to support HiveQL. Users can specify whether to use Hive syntax in the SQL Client’s YAML file. You can also run the set table. SQL -dialect=hive/default command in the SQL client to dynamically switch. For more information, see FLIP-123.
This is a brief overview of the connectivity and simplicity optimizations in Flink 1.11 to reduce unnecessary user input and operations. Two core optimizations for connectivity and Simplicity in terms of external systems and data ecology will be highlighted, along with best practices.
Hive database realtime & Flink SQL + CDC best practices
Flink-17433: Real-time Hive number bin
Below is a very classic Lambda warehouse architecture, representing “state of the art productivity” over the years as the entire big data industry moved from batch processing to streaming computing. However, as the business grows and expands, the development, operation and maintenance, and computing costs brought by the two separate architectures have become increasingly prominent.
Flink as a flow of the integration of computing engine, on the design of the initial thought that “everything is all essence”, the batch processing is a special case of the flow calculation, if can provide efficient batch capacity in their own at the same time with the existing ecological combined with big data, can transform existing number of open positions in the form of minimal invasive architecture to support flow of an organic whole. In the new release, Flink SQL provides out-of-the-box “Hive data warehouse synchronization”, that is, all data processing logic is executed by Flink SQL in stream computing mode, at the data write side, automatically ODS, The processed data of the DWD and DWS layers is streamed back to the Hive table in real time. One Size (SQL) fits for All Suites (tables) design eliminates the need to maintain any calculation pipelines at the Batch layer.
What are the benefits and problems it solves compared to traditional architectures?
-
Unified calculation caliber and processing logic, reduce development and operation and maintenance costs
The biggest problem of maintaining two sets of data pipelines in traditional architectures is the need to maintain the equivalence of their processing logic. However, due to the use of different computing engines (such as Hive offline, Flink or Spark Streaming in real time), SQL often cannot be applied directly. There are differences in the code. Over time, offline and real-time processing logic may diverge completely. Some large companies even have two teams to maintain the real-time and offline data warehouse respectively, resulting in very high human and material costs. Flink supports Hive Streaming Sink, real-time processing results can be streamed back to Hive tables in real time, offline computing layer can be completely removed, and processing logic is maintained by Flink SQL. The offline layer only needs to make further ad-hoc queries with well-reflow ODS, DWD, and DWS tables.
-
Offline processing of “data drift” is more natural, offline data warehouse “real-time”
Non-data-driven scheduling execution of offline data warehouse pipeline often requires a lot of trick to ensure the integrity of partitioned data in the processing of data boundary across partitions. However, in the case of parallel data warehouse architecture, Sometimes, data comparison may be inconsistent due to differences in late event processing. The real-time data-driven processing method and Flink’s friendly support for Event Time itself meant that business time was used as the partition (Window), and the integrity and timeliness of real-time and offline data could be defined uniformly through Event Time + watermark. Hive Streaming Sink solves the “last mile problem” of offline data warehouse synchronization.
Flip-105: Support for Change Data Capture (CDC)
In addition to Hive Streaming Sink support, another highlight of Flink SQL 1.11 is the introduction of the CDC mechanism. Change Data Capture (CDC) is used to add, delete, Change, or query tracking database tables. It is a very mature solution to synchronize database changes. The common CDC tool in China is Canal, which is open source of Ali, and Debezium is more popular abroad. Flink SQL at the beginning of design put forward the concept of Dynamic Table and “flow Table duality”, and in Flink SQL complete support of Changelog function, compared with other open source flow computing system is an important advantage. In essence, Changelog is equivalent to a table in a database that is always changing. The concept of Dynamic Table is the cornerstone of Flink SQL. The operators of Flink SQL transfer is Changelog, which fully supports the types of Insert, Delete and Update messages.
Thanks to the power of Flink SQL runtime, Flink and CDC only need to transfer external data flow into Flink system internal Insert, Delete, Update messages. Once inside Flink, you have the flexibility to apply Flink’s various Query syntax.
In practice, if you register the Debezium Kafka Connect Service with a Kafka cluster and bring the database table information that you want to synchronize, Kafka will automatically create a topic and listen to the Binlog to synchronize the changes to the topic. Consuming data with CDC is also easy on the Flink side by declaring format = debezium-JSON in the DDL.
In Flink 1.11, the developers also made some interesting exploration. Since the Flink SQL runtime has full support for Changelog, is it possible to get MySQL changes directly from Flink without Debezium or Canal’s services? The Debezium library is well designed so that its API can be encapsulated as Flink’s Source Function without the need for additional services. The project is now open Source. Support for MySQL and Postgres CDC read, follow-up will also support more types of database, may use position set https://github.com/ververica/flink-cdc-connectors to unlock more.
The following Demo shows how to use Flink-CDC-Connectors to capture mysql and Postgres data changes and synchronize them to ElasticSearch using Flink SQL for multi-stream Join.
Suppose you are in an e-commerce company, orders and logistics are your core data, and you want to analyze the delivery status of your orders in real time. Because the company has become so large, the information of goods, orders and logistics are scattered in different databases and tables. We need to create a streaming ETL that consumes all of the database’s full and incremental data in real time and correlates them together into a large wide table. So as to facilitate the subsequent analysis of data analysts.
4 Future planning of Flink SQL 1.12
The above introduces the core functions and best practices of Flink SQL 1.11. For the next version, Yunxie also presents some ongoing plans, and welcomes your active comments and suggestions in the community.
- Flip-132: Temporal Table DDL (Dimension Table Association in Binlog mode)
- Flip-129: Refactoring Descriptor API (Table API DDL)
- Schema Registry Avro format is supported
- Better CDC support (batch processing, upsert output to Kafka or Hive)
- Optimization of Streaming File Sink small File issue
- N-ary Input Operator (Batch performance improvement)
5 Appendix
Use the new versionTableEnvironment
Common errors encountered and reasons
No operators defined in streaming Topology. The reason for this problem is that the next two methods of the INSERT INTO statement were executed in older versions
TableEnvironment#sqlUpdate()
TableEnvironment#execute()
Copy the code
There is not full forward compatibility in the new version (the methods are still there, the execution logic has changed), If there is no converting Table AppendedStream/RetractStream (through StreamExecutionEnvironment# toAppendStream/toRetractStream), The above code execution causes the above error; At the same time, once made the transformation, we must use StreamExecutionEnvironment# the execute () to trigger job execution. Therefore, users are advised to migrate to the new VERSION of the API, which will be semantically clearer.
The second problem is that print does not see the return value after calling the new TableEnvironemnt#executeSql(). The reason is that print currently relies on checkpoint mechanism and can start exactly-onece. The new version will optimize this problem.
The old version of theStreamTableSource
,StreamTableSink
Common error and new version optimization
The first common error is that precision types are not supported. This usually occurs with JDBC or HBase data sources. This problem does not occur with the new release.
The second common error is that PK cannot be found with Sink, because the old StreamSink needs to derive PK from query. When query becomes complicated, PK information may be lost, but PK information is actually available in DDL. There is no need to derive through query, so this error will no longer occur with the new version of Sink.
The third common error is that when parsing Source and Sink, if the user fills less or wrong parameters, the error message returned by the framework is very vague, “Table Factory cannot be found”, and the user does not know how to modify. This is because the old VERSION of SPI is designed to be more general, and there is no separate processing for the logic of Source and Sink parsing. When the complete parameter list cannot be matched, the framework defaults that the current table Factory is not to be found. If the table Factories don’t match any of the table factories, you can check the error. In the new version of the loading logic, Flink determines the connector type before matching the rest of the parameter list. If a required parameter is missing or incorrectly filled, the framework can accurately report an error to the user.