This article was compiled by community volunteer Chen Zhengyu. The Apache Flink community released version 1.13 in May, bringing many new changes. The article is adapted from “Flink SQL 1.13 in Depth” shared by Xu Bangjiang (Xuejin) at Flink Meetup in Beijing on May 22. It includes:
- Flink SQL 1.13 Overview
- Core feature interpretation
- Key improvement interpretation
- Flink SQL 1.14 future planning
- conclusion
Flink SQL 1.13 overview
Flink 1.13 is a large community version, which has solved more than 1,000 issues. As can be seen from the figure above, most of the problems solved are about Table/SQL modules, and a total of more than 400 issues account for 37% of the total. These issues mainly revolve around 5 FLIP, and we will introduce them in this paper according to these 5 aspects, which are as follows:
Here’s a closer look at these FLIP’s.
Ii. Core feature interpretation
1. Flip-145: Supports Windows TVF
Those in the community should know that Tencent, Alibaba, Bytedance and other companies have developed a basic version of this feature. The Flink community also released TVF support and optimization in Flink 1.13. We will examine this new feature in terms of Window TVF syntax, near real time cumulative computing scenarios, Window performance optimization, and multidimensional data analysis.
1.1 Window TVF syntax
Before version 1.13, Windows was implemented with a special SqlGroupedWindowFunction:
SELECT
TUMBLE_START(bidtime,INTERVAL '10' MINUTE),
TUMBLE_END(bidtime,INTERVAL '10' MINUTE),
TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTE),
SUM(price)
FROM MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE)
Copy the code
In version 1.13, we normalized table-function syntax:
SELECT WINDOW_start,WINDOW_end,WINDOW_time,SUM(price)
FROM Table(TUMBLE(Table myTable,DESCRIPTOR(biztime),INTERVAL '10' MINUTE))
GROUP BY WINDOW_start,WINDOW_end
Copy the code
BY comparing the two syntax, we can find that TVF syntax is more flexible and does not need to be followed BY the GROUP BY keyword, and Window TVF is based on relational algebra, which makes it more standard. When only the window scene needs to be divided, only TVF can be used instead of GROUP BY for aggregation, which makes TVF more scalable and expressive and supports custom TVF (such as TVF that implements top-N).
The example in the figure above is the partition of rolling Windows by using TVF, which only needs to divide data into Windows without aggregation. If you need to aggregate later, GROP BY is enough. At the same time, this is quite natural for users familiar with batch SQL, and we no longer have to bind window partitioning and aggregation with special SQLGroup Windows Function as we did before version 1.13.
Windows TVF now supports Tumble Window, Hop Window and Cumulate Window. Session Window is expected to be supported in version 1.14.
1.2 the Cumulate Window
A Cumulate window is a cumulative window. To put it simply, the time interval in the diagram above is the step of the window.
- The first window is an interval;
- The second window collects data from the first interval and the second interval.
- The third window collects the data of the first interval, the second interval, and the third interval.
Cumulative computing is common in business scenarios, such as cumulative UV scenarios. In the UV market curve: we count the cumulative UV of the day every 10 minutes.
Before version 1.13, when we needed to do this calculation, our general SQL was written as follows:
INSERT INTO cumulative_UV
SELECT date_str,MAX(time_str),COUNT(DISTINCT user_id) as UV
FROM (
SELECT
DATE_FORMAT(ts,'yyyy-MM-dd') as date_str,
SUBSTR(DATE_FORMAT(ts,'HH:mm'),1.4) || '0' as time_str,
user_id
FROM user_behavior
)
GROUP BY date_str
Copy the code
Firstly, the time window fields of each record are spliced together, and then all records are aggregated BY GROUP BY according to the spliced time window fields, so as to achieve the effect of approximate cumulative calculation.
-
The pre-1.13 version had a number of drawbacks, starting with the fact that each record was evaluated once. Secondly, in the pursuit of inverse data, consumption of accumulated data, UV market curve will jump.
-
In version 1.13, TVF notation is supported. Based on Cumulate Window, we can modify it to the following notation, where each data is accurately divided into each window according to Event Time, calculation of each window is triggered by watermark. It does not jump even in a data chase scenario.
INSERT INTO cumulative_UV
SELECT WINDOW_end,COUNT(DISTINCT user_id) as UV
FROM Table(
CUMULATE(Table user_behavior,DESCRIPTOR(ts),INTERVAL '10' MINUTES,INTERVAL '1' DAY)))GROUP BY WINDOW_start,WINDOW_end
Copy the code
UV mass curve effect is shown in the figure below:
1.3 Window Performance optimization
The Flink 1.13 community has implemented a number of performance optimizations for Window TVF, including:
- ** Memory optimization: ** Cache window data through memory preallocation, trigger calculation through window watermark, and avoid high frequency access state by applying some memory buffer;
- ** Slice optimization: ** Slice the window and reuse the calculated results as much as possible, such as Hop Window and cumulate Window. The calculated slice data need not be calculated again, only the calculation results of slice can be reused.
- ** operator optimization: **window operator supports local-global optimization; At the same time support count(DISTINCT) automatic solution hot spot optimization;
- ** Late data: ** Support the calculation of late data to subsequent fragments to ensure the accuracy of data.
Based on these optimizations, we performed performance tests using open source Benchmark (Nexmark). The results showed that Windows had a 2x increase in universality and even better performance in count(DISTINCT) scenarios.
1.4 Multidimensional data analysis
The standardization of syntax brings more flexibility and extensibility, and users can perform multidimensional analysis directly on the window function. As shown in the figure below, GROUPING SETS, ROLLUP and CUBE can be directly analyzed and calculated. Prior to 1.13, we would have needed to do separate SQL aggregations of these groups and then union the aggregations to achieve similar results. Now, scenarios like this multidimensional analysis can be supported directly on Window TVF.
Support the Window Top – N
In addition to multidimensional analysis, Window TVF also supports top-n syntax, making it easier to write top-n on Windows.
Flip-162: Time zone and time function
2.1 Time Zone Analysis
There are a lot of time zone related problems reported when using Flink SQL. The causes of time zone problems can be summarized as three:
-
The PROCTIME() function should take time zone into account, but doesn’t;
-
The CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() function does not consider the time zone;
-
The time attribute of Flink can only be defined in the data type TIMESTAMP, which is timeless. TIMESTAMP does not consider the time zone, but the user expects the time in the local time zone.
For TIMESTAMP types that do not take time zones into account, we propose support via the TIMESTAMP_LTZ type (TIMESTAMP_LTZ is short for TIMESTAMP with local time zone). TIMESTAMP can be compared with the following table:
TIMESTAMP_LTZ differs from the previous TIMESTAMP in that it represents absolute time. By comparison, we can find that:
-
If we configure TIMESTAMP, it can be a string. This value is the same whether the user is looking at the TIME zone from the UK or China;
-
But in the case of TIMSTAMP_TLZ, its source is a Long value representing the elapsed time from the origin. At the same time, the elapsed time from the origin is the same in all time zones, so this Long value is the concept of absolute time. When we observe this value in different time zones, we will use the local time zone to interpret the “year-month-day-hour-minute-second” readable format. This is the TIMSTAMP_TLZ type. TIMESTAMP_LTZ is also more suitable for users in different time zones.
The following example shows the difference between TIMESTAMP and TIMESTAMP_LTZ.
2.2 Correction of time function
Correct the PROCTIME() function
When we had TIMESTAMP_LTZ, we corrected the PROCTIME() type: before version 1.13, it always returned TIMESTAMP in UTC; Now we have changed the return type to TIMESTAMP_LTZ. In addition to representing functions, PROCTIME can also represent tokens for time attributes.
Update CURRENT_TIMESTAMP/CURRENT_TIME/CURRENT_DATE/NOW() function
The values of these functions vary from time zone to time zone. For example, in the United Kingdom UTC time zone it is 2am; But if you set the time zone to UTC+8, the time is at 10 in the morning. The actual time varies depending on the time zone. The result is shown in the following figure:
Troubleshoot the Processing Time Window time zone problem
We all know that proctime can represent a time attribute.
-
Prior to version 1.13, if we wanted to do Windows by day, you had to manually fix the time zone, make some 8-hour offset and then subtract back;
-
In Flip-162 we solved this problem, and now it’s very simple for users to just declare the procTime attribute, because the proctime () function returns TIMESTAMP_LTZ, so the local time zone is taken into account as a result. The example below shows the aggregation of the ProcTime attribute window by local time zone in different time zones.
Revised function value mode in Streaming and Batch mode
In fact, the time function in the stream and batch above the performance will be different, this revision is mainly to make it more in line with the actual user habits. For example:
- In stream mode, per-record calculation is performed, that is, every piece of data is computed once;
- In Batch mode, the query-start calculation is performed once before the job starts. For example, some Batch computing engines, such as Hive, calculate each Batch once before it starts.
2.3 Time Type Usage
Defining Event time on the TIMESTAMP column was also supported in version 1.13, meaning that Event time can now be defined on both the TIMESTAMP column and the TIMESTAMP_ LTZ column. So as a user, what types are used for specific scenarios?
-
When the upstream source data of a job contains a string (e.g. 2021-4-15 14:00:00), just declare it as TIMESTAMP and define Event Time on it. The window will be segmented based on the time string during calculation, and finally calculate the expected result that meets your actual want;
-
When the time of the upstream data source is a long value, it represents an absolute time meaning. In version 1.13 you could define Event Time on TIMESTAMP_LTZ. All WINDOW aggregations defined on the TIMESTAMP_LTZ type automatically resolve the 8-hour time zone offset problem without the need for additional time zone changes and revisions to the previous SQL script.
Note: These improvements in Flink SQL regarding time functions and time zone support are version incompatible. During version update, check whether the job logic contains such functions to avoid service impact after the upgrade.
2.4 DST support
Prior to Flink 1.13, window-related calculations were difficult for users of foreign DST time zones because of the transition between DST and winter time.
Flink 1.13 elegantly supports daylight saving time by allowing time attributes to be defined on the TIMESTAMP_LTZ column, while Flink SQL cleverly combines TIMESTAMP and TIMESTAMP_LTZ types for WINDOW processing. This is useful for users of foreign DST time zones, as well as for companies with overseas business scenarios.
Third, important improvement interpretation
FLIP-152: Improved Hive syntax compatibility
FLIP-152 supports Hive DML and DQL syntax, including:
Hive Dialect supports common Hive syntax. Hive provides a number of built-in Hive functions. Hive Modules need to work with HiveCatalog and Hive Modules. The Hive Module provides all built-in Hive functions and can be accessed after loading them.
Flink SQL can also create/delete Catalog functions and custom functions using Hive Dialect, which greatly improves compatibility with Hive and makes it easier for Hive users to use.
FLIP-163: Improved SQL Client
Prior to version 1.13, Flink SQL Client was considered a peripheral gadget. However, significant improvements were made to the FLIP 163 in version 1.13:
-
Through the -i parameter, the DDL is loaded and initialized at a time in advance, which is convenient for initializing multiple DDL statements of the table. You do not need to execute commands to create tables for many times, instead of using YAML files to create tables.
-
The -f parameter is supported. SQL files support INSERT into (DML) statements.
-
More useful configurations are supported:
- Run the SET sql-client. verbose = true command to enable the verbose function. By enabling the verbose function, you can print the entire information, making it easier to trace error information than before, when only one sentence is displayed.
- Streaming/batch SET runtime. runtime. mode=streaming/batch
- SET pipline. Name =my_Flink_job to SET the job name.
- Path =/ TMP/flink-savepoints /savepoint-bb0dab to SET the savepoint path of the job.
- For multiple dependent jobs, SET table. dmL-sync =true is used to select whether to execute asynchronously. For example, for offline jobs, job A can run only when job B runs.
-
STATEMENT SET syntax is also supported:
It is possible that a query may be written to more than one sink, for example, one sink writes to JDBC and one sink writes to HBase.
- Prior to version 1.13, 2 queries were launched to complete the job;
- In version 1.13, we can put these in a statement and execute them as a job, which can reuse nodes and save resources.
FLIP-136: Enhanced DataStream and Table conversion
While Flink SQL significantly lowers some of our barriers to use with real-time computing, the advanced encapsulation of Table/SQL also shields some low-level implementations, such as Timer, state, and so on. Many advanced users want to be able to operate DataStream directly for more flexibility, which requires conversion between tables and DataStream. Flip-136 enhances the conversion between Table and DataStream, making it easier for users to switch between the two.
- Support EVENT TIME and WATERMARK when DataStream and Table are converted.
Table Table = TableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime"."TIMESTMP(3)")
.watermark("rowtime"."SOURCE_WATERMARK()")
.build());
)
Copy the code
- Changelog data flows can be converted between tables and DataStream.
/ / turn Table DATASTREAM
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>): Table
StreamTableEnvironment.fromChangelogStream(DataStream<ROW>,Schema): Table
//Table 转 DATASTREAM
StreamTableEnvironment.toChangelogStream(Table): DataStream<ROW>
StreamTableEnvironment.toChangelogStream(Table,Schema): DataStream<ROW>
Copy the code
Flink SQL 1.14 future planning
1.14 has the following plans:
-
Delete Legacy Planner: Since Flink 1.9, after Ali’s contribution to the Blink Planner, many new features have been developed based on the Blink Planner, while the old Legacy Planner was completely removed;
-
Perfect Window TVF: support session Window, support Allow-lateness of Window TVF, etc.
-
Improved Schema Handling: Improved whole-link Schema processing capability and key verification;
-
Enhanced Flink CDC support: Enhanced integration with upstream CDC systems, more operators in Flink SQL support CDC data flows.
Five, the summary
This article took a detailed look at the core features and significant improvements of Flink SQL 1.13.
-
Support Window TVF;
-
Systematically solve time zone and time function problems;
-
Improved compatibility between Hive and Flink.
-
Improved SQL Client;
-
Enhanced DataStream and Table conversion.
At the same time, we also share the community’s future plans for Flink SQL 1.14. We believe that after reading the article, students can have more understanding of the changes in Flink SQL 1.14. In the process of practice, you can pay more attention to these new changes and changes, and feel the convenience they bring to the business level.