Thank you for your little love (attention + like + look again), the affirmation of the blogger, will urge the blogger to continue to output more quality actual combat content!!
1. Preface – Structure of this paper
Big data sheep said
Use data to increase the probability of good things happening
32 original content
The public,
This paper mainly introduces the relationship between Flink SQL and Calcite. Flink SQL parsing relies primarily on Calcite.
The blogger tries to help you understand how Flink SQL relies on Calcite in parsing, as well as the process of Flink SQL parsing and SQL Parser. Hope to help you.
This paper is introduced through the following sections. Those who are interested in a chapter can directly go to the corresponding chapter.
- Background – a flink SQL execution process
-
Use your imagination
-
Look at flink’s implementation
- Introduction – Calcite’s role
-
Calcite is what?
-
Flink SQL why calcite?
- Case studies – Calcite capabilities, cases
-
First use calcite
-
Relational algebra
-
Calcite is the basic model of knowledge
-
Calcite processing flow (taking Flink SQL as an example)
-
How is Calcite so universal?
- Calcite in Flink SQL performance
-
FlinkSqlParserImpl
-
The generation of FlinkSqlParserImpl
- Summary and outlook
2. Background – a flink SQL execution process
This section describes the execution process of a FLink SQL. If you don’t know the details, it doesn’t matter if you don’t know the details. After you have a global perspective, details will be detailed later.
Before introducing the execution process of a Flink SQL, let’s take a look at the execution process of the Flink datastream task. It is helpful to understand the execution process of a Flink SQL.
-
Datastream: Datastream needs to customize the processing logic in various UDFs (such as FlatMap and KeyedProcessFunction) provided by flink datastream API. The specific service execution logic is written by code and Java files. The compilation is then executed in the JVM using exactly the same process as a normal main function. Because you write your own code execution logic, this part is relatively easy to understand.
-
SQL: The Java compiler does not recognize and compile an SQL statement for execution. How does an SQL statement execute?
2.1. Use your imagination first
Let’s think backwards about what we need to do if we want a Flink SQL to execute in the JVM as we expect.
-
Overall: See Datastream. If the JVM can execute datastream Java code compiled class files, add a SQL parsing layer that can parse SQL logic into datastream operators and then compile and execute.
-
SQL Parser: You need to have an SQL Parser first. You need to be able to read SQL syntax and convert it to AST, concrete relational algebra.
-
Datastream operator mapping: SQL logic to datastream. SQL is based on relational algebra and can maintain a mapping between each relational algebra in an SQL and a datastream interface. With these mappings, you can map SQL into executable datastream code. For example: It can combine:
-
SQL Select XXX is resolved to map similar to datastream
-
Where XXX is resolved to filter
-
Group by resolves to keyby
-
Sum (xx) and count (XXX) can be parsed as the aggregate function in datastream
-
etc…
-
Code generation: With SQL AST, the mapping of SQL to the DATASretam operator is ready for concrete code generation. For example, you need to generate datastream code to parse which fields in the SQL AST are used as WHERE logic and which fields are used as group BY.
-
Run: After the above process, you can translate an SQL into a datastream job, happy execution.
The following diagram depicts the above logic:
12
So how does this differ from the actual implementation of Flink?
Flink basically does this, although there are other processes in the middle of Flink itself, and later versions are not based on datastream, but the overall processing logic is the same as above.
Therefore, students who do not understand the overall process can first follow the above process to understand.
SQL -> AST -> codeGen (Java Code) -> Let’s run, ok
2.2. Look at the implementation of Flink
26
It may be hard to see by hand, but the picture below is even clearer.
28
A standard flink SQL process runs as follows:
Notes: The concepts of SqlNode and RelNode may be vague at the beginning. Understand the process first, and the concepts will be explained in more detail later.
-
Calcite Parser parsing (SQL -> AST, AST is SqlNode Tree)
-
Calcite Validator (SqlNode -> SqlNode)
-
SqlNode -> RelNode Logical Plan (SqlNode -> RelNode)
-
Calcite Optimizer (RelNode -> RelNode, prune, predicate push-down, etc.)
-
DataSet\DataStream API: Logical Plan to DataSet\DataStream API
-
The subsequent running logic is the same as datastream
It can be found that Flink’s implementation is more consistent with the overall main framework of the blogger’s imagination. The extra part is mainly SqlNode verification stage, optimization stage.
3. Introduction – Calcite’s role in Flink SQL
With an overview of how Flink SQL works, let’s take a look at what calcite does in Flink.
According to the summary above, Calcite plays the role of SQL parsing, validation and optimization in FLink SQL.
30
So what is Calcite, what is its position?
3.1. Calcite is what?
Calcite is a dynamic data management framework. It can be used to build different parsing modules of database systems, but it does not include data storage, data processing and other functions.
Calcite aims to be a solution that fits all requirements scenarios and provides a unified SQL parsing engine for different computing platforms and data sources, but it only provides a query engine without actually storing the data.
61
Below is currently using the other components of calcite ability also visible website calcite.apache.org/docs/powere… .
4
To put it more simply, calcite does these things (and many other cool things, too, if you’re interested).
-
Custom SQL parsers: For example, if we invent a new engine and we want to create a SET of SQL-based interfaces on the engine, we can use direct Calcite instead of writing a special SQL parser, as well as the execution and optimization engine, which calCite has.
-
SQL Parser (extends SqlAbstractParserImpl) : SQL will be a variety of relational algebra parsing into specific AST, these AST can correspond to specific Java Model, in the Java world, objects are very important, with these objects (SqlSelect, SqlNode), you can do specific logic processing according to these objects. For example, select c,d from source where a = ‘6’ SQL, calcite parses the AST model (SqlNode). SqlSelect, SqlIdentifier, SqlIdentifier, and SqlCharStringLiteral are displayed.
-
SQL Validator (extends SqlValidatorImpl) : verifies SqlNode correctness based on syntax, expression, and table information.
-
SQL Optimizer: pruning, predicate push-down, etc
The overall composition of these capabilities is shown below:
29
To actually parse an SQL using CalCite, run and see.
2
3.2. Flink SQL why calCite?
-
You don’t have to reinvent the wheel. Your limited energy should be spent on something worthwhile.
-
Calcite has a solution for stream tables. Concrete calcite.apache.org/docs/stream…
4. Case – Calcite competence and cases
Use calcite first
Above all, running is the way to go before you know how it works, and it will help us understand it gradually.
There is already a CSV example on the website. Interested can direct calcite.apache.org/docs/tutori… .
After running a CSV demo, I need to take a look at SQL before going into more detail about Calcite, the backbone of Calcite: relational algebra.
4.2. Relational algebra
SQL is a query language based on relational algebra. It is a good implementation scheme of relational algebra in engineering. In engineering, relational algebra is hard to express, but SQL is easy to understand. The relationship between relational algebra and SQL is as follows.
-
You can parse a piece of SQL into a combination of relational algebraic expressions. All operations in SQL can be converted to expressions in relational algebra.
-
SQL execution optimizations (all optimizations are based on the premise that the final execution result is the same before and after optimization, i.e., equivalence exchange) are based on relational algebra.
4.2.1. Commonly used relational algebra
To summarize, there are some commonly used relational algebras:
50
4.2.2. Relational algebraic equivalent transformation of SQL optimization pillars
The relational algebraic equivalent transformation is the basic theory of Calcite Optimizer.
Here are some examples of equivalent transformations.
1. Connection (⋈), commutative law of Cartesian product (×)
51
2. Connection (⋈), associative law of Cartesian product (×)
3. String Law of Projection (π)
4. Select the concatenation law of sigma
5. Interchange of choice (σ) and projection (π)
6. Select the exchange of (σ) and the Cartesian product (×)
7. Choose the exchange of σ and union
8. Select the exchange of σ and difference (-)
Interchange of projection (π) with cartesian product (×)
10. Interchange of projection (π) and union (π)
Then look at a practical SQL example based on relational algebra optimization:
There are three relationships A (A1, A2, A3…) B (b1,b2,b3…) C (a1,b1,c1,c2…)
There is a query request as follows:
SELECT a.1 FROM A, B, C WHERE a.1 = c.1 AND b.1 = c.1 AND f(c1)
1. First convert SQL into a syntax tree of relational algebra.
36
2. Optimization: Choose the concatenation law of sigma.
47
37
3. Optimization: Select the exchange of (σ) and cartesian product (×).
48
38
4. Optimization: exchange of projection (π) and Cartesian product (×).
49
So we have a general idea of relational algebra.
In addition, for a deeper understanding of Flink SQL, CalCite, we also need to know what important models are in the CalCite code architecture.
4.3. The basic model of calcite
There are two very basic and important models in Calcite that we need to know to understand the flink SQL parsing process.
-
SqlNode: converted from SQL, can be understood as an intuitive representation of SQL hierarchy model
-
RelNode: converted from SqlNode, can be understood as SqlNode into a relational algebra, expressing the relational algebra hierarchy model
Flink SQL SqlNode RelNode flink SQL
SELECT
sum(part_pv) as pv,
window_start
FROM (
SELECT
count(1) as part_pv,
cast(tumble_start(rowtime, INTERVAL '60' SECOND) as bigint) * 1000 as window_start
FROM
source_db.source_table
GROUP BY
tumble(rowtime, INTERVAL '60' SECOND)
, mod(id, 1024)
)
GROUP BY
window_start
Copy the code
62
SqlNode contains SQL hierarchies, including selectList, FROM, WHERE, Group by, and so on.
RelNode contains a hierarchy of relational algebras, each of which has an input. Combined with the tree structure of the optimization case above.
63
4.4. Calcite processing process (Taking Flink SQL as an example)
29
As shown in the figure above, here we combine the Calcite model introduced in the previous section with the implementation of Flink SQL to walk through its processing flow:
-
SQL – > SqlNode
-
SqlNode verification (SqlNode – > SqlNode)
-
Semantic analysis (SqlNode – > RelNode)
-
Optimization phase (RelNode — > RelNode)
4.4.1. Flink SQL demo
SELECT
sum(part_pv) as pv,
window_start
FROM (
SELECT
count(1) as part_pv,
cast(tumble_start(rowtime, INTERVAL '60' SECOND) as bigint) * 1000 as window_start
FROM
source_db.source_table
GROUP BY
tumble(rowtime, INTERVAL '60' SECOND)
, mod(id, 1024)
)
GROUP BY
window_start
Copy the code
The first three parses and transformations are performed by executing TableEnvironment#sqlQuery.
The final step of optimization is when the sink operation is executed, which in this case is tenv.toretractStream (result, row.class).
Source number public reply flink background SQL know why (6) | flink SQL date calcite.
4.4.2. SQL Parsing Phase (SQL – > SqlNode)
SQL Parser is used to parse SQL into SqlNode. This is done by executing TableEnvironment#sqlQuery.
As you can see from the figure above, the flink SQL implementation class is FlinkSqlParserImpl.
68
Parse the SqlNode as shown in the figure above.
4.4.3.SqlNode Verification (SqlNode – > SqlNode)
The first step above produces an unvalidated SqlNode object. This step is the syntax check phase. Before the syntax check, you need to know the metadata information, including table names, field names, function names, and data types. The implementation of syntax checking is as follows:
As you can see from the figure above, the implementation class of the Flink SQL validator is FlinkCalciteSqlValidator, which contains metadata information so that metadata information can be checked.
4.4.4. Semantic Analysis (SqlNode – > RelNode)
This step is to convert SqlNode to RelNode, which generates the corresponding relational algebraic level logic (commonly called Logical plans).
4.4.5. Optimization Phase (RelNode — > RelNode)
This step is the optimization phase. You can view the details in the debug code.
4.5. How is Calcite so versatile?
Using calcite Parser as an example, why is its module common? The other modules work in a similar fashion.
Conclusion: Since the CalCite Parser module provides an interface, the parse logic and rules can be customized. If you look at the picture below, the blogger has drawn a picture to illustrate.
5
As shown in the figure above, the engine SQL parser is generated with one input, which is the user-defined parsing rule variable. The specific engine SQL parser is actually generated according to the user-defined parsing rule. The dynamic generation of its parsers relies on components like JavACC. Calcite provides a unified SQL AST model, optimization model interface, etc., and the specific parsing implementation is left to the user to decide.
Javacc generates specific SQL Parser code from the parser.jj file defined in Calcite (see figure above). The SQL Parser’s ability is to convert SQL into AN AST (SqlNode). More details on calcite’s capabilities can be found at matt33.com/2019/03/07/… .
You can download calcite source github.com/apache/calc… After that, switch to Core Module and check it out.
31
4.5.1. Javacc is what?
Javacc is one of the most popular parsing generators developed in Java. The analysis generator tool reads context-free syntax with special meaning and translates it into a Java program that recognizes and matches that syntax. It is 100% pure Java code and runs on a variety of platforms.
Javacc is a general-purpose parser that allows you to define a DSL and a parser.
For example, if you ever feel that SQL is not concise enough for general use, you can use Javacc to define a more concise user-define-QL. Then use JavACC as the parser for your User-define-QL. Is not very flow batch, you can make your own compiler.
4.5.2. Run javacc
This section does not cover the javACC syntax, but uses simple1.jj as an example. For detailed syntax and functions, please refer to the official website (javacc.github. IO /javacc/) or the blog.
-
www.cnblogs.com/Gavin\_Liu/…
-
www.yangguo.info/2014/12/13/…
-
www.engr.mun.ca/~theo/JavaC…
Simple1.jj is used to identify a series of {equal number of curly braces}, followed by zero or more line terminators.
7
Here is an example of a valid string:
{}, {{{{{}}}}}, etc.
Here is an example of an invalid string:
{{{{, {}{}, {}}, {{}{}}, etc.
Let’s actually compile Simple1.jj to generate the concrete rule code.
Add javacc build plugin to POM:
<plugin> <! -- This must be run AFTER the fmpp-maven-plugin --> <groupId>org.codehaus.mojo</groupId> < artifactId > javacc -- maven plugin < / artifactId > < version > 2.4 < / version > < executions > < execution > <phase>generate-sources</phase> <id>javacc</id> <goals> <goal>javacc</goal> </goals> <configuration> <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory> <includes> <include>**/Simple1.jj</include> </includes> <! -- This must be kept synced with Apache Calcite. --> <lookAhead>1</lookAhead> <isStatic>false</isStatic> <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory> </configuration> </execution> </executions> </plugin>Copy the code
After compile, the generated code is generated in generated-sources:
8
Then copy the code to Sources:
33
{}, {{}} can be verified by executing the code, and will throw an exception if {{input does not conform to the rule.
Javacc is basically the outline.
Those interested can try a custom compiler.
4.5.3. FMPP is what?
5
FMPP is a freemarker-based template producer. Users can uniformly manage their own variables, and then use FTL template + variables to generate the corresponding final file. FMPP is used in Calcite as the unified manager for variable + templates. The corresponding parser.jj file is then generated based on FMPP.
5. Principle analysis – Calcite in Flink SQL show its skills
The blogger drew a picture of the dependencies between the important components.
3
FMPP (Parser. Jj template generated) -> Javacc (Parser generated) -> calcite.
Before going through the Parser generation process, let’s take a look at flink’s final Parser generation: FlinkSqlParserImpl (here using Blink Planner).
5.1. FlinkSqlParserImpl
Start with the following example (based on Flink version 1.13.1) :
public class ParserTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); DataStream<Tuple3<String, Long, Long>> tuple3DataStream = env.fromCollection(Arrays.asList( Tuple3.of("2", 1L, 1627254000000L), Tuple3.of("2", 1L, 1627218000000L + 5000L), Tuple3.of("2", 101L, 1627218000000L + 6000L), Tuple3.of("2", 201L, 1627218000000L + 7000L), Tuple3.of("2", 301L, 1627218000000L + 7000L), Tuple3.of("2", 301L, 1627218000000L + 7000L), Tuple3.of("2", 301L, 1627218000000L + 7000L), Tuple3.of("2", 301L, 1627218000000L + 7000L), Tuple3.of("2", 301L, 1627218000000L + 7000L), Tuple3.of("2", 301L, 1627218000000L + 86400000 + 7000L))) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>>(Time.seconds(0L)) { @Override public long extractTimestamp(Tuple3<String, Long, Long> element) { return element.f2; }}); tEnv.registerFunction("mod", new Mod_UDF()); tEnv.registerFunction("status_mapper", new StatusMapper_UDF()); tEnv.createTemporaryView("source_db.source_table", tuple3DataStream, "status, id, timestamp, rowtime.rowtime"); String sql = "SELECT\n" + " count(1),\n" + " cast(tumble_start(rowtime, INTERVAL '1' DAY) as string)\n" + "FROM\n" + " source_db.source_table\n" + "GROUP BY\n" + " tumble(rowtime, INTERVAL '1' DAY)"; Table result = tEnv.sqlQuery(sql); tEnv.toAppendStream(result, Row.class).print(); env.execute(); }}Copy the code
The debug process is as shown in the preceding SQL -> SqlNode analysis process, and SqlParser is directly located as shown in the following figure:
21
As you can see above, the specific Parser is FlinkSqlParserImpl.
The specific code is located as shown in the figure below (flink-table-palnner-blink-2.11-1.13.1.jar).
34
The final parse result of SqlNode is shown below.
22
Look again at how FlinkSqlParserImpl is generated using CalCite.
Specific to the implementation of Flink, in the source code of flink-table. Flink – SQL – Parser module (source code based on Flink 1.13.1).
Flink is the overall process above that relies on the Maven plugin implementation.
5.2. FlinkSqlParserImpl generated
14
Next, look at the entire Parser generation process.
5.2.1. Flink into calcite
Unzip CalCite into the Flink project build directory using maven-dependency-plugin.
<plugin> <! -- Extract parser grammar template from calcite-core.jar and put it under ${project.build.directory} where all freemarker templates are. --> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>unpack-parser-template</id> <phase>initialize</phase> <goals> <goal>unpack</goal> </goals> <configuration> <artifactItems> <artifactItem> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <type>jar</type> <overWrite>true</overWrite> <outputDirectory>${project.build.directory}/</outputDirectory> <includes>**/Parser.jj</includes> </artifactItem> </artifactItems> </configuration> </execution> </executions> </plugin>Copy the code
15
5.2.2. FMPP generated Parser. Jj
Parser.jj code is generated using maven-resources-plugin.
<plugin> <artifactId>maven-resources-plugin</artifactId> <executions> <execution> <id>copy-fmpp-resources</id> <phase>initialize</phase> <goals> <goal>copy-resources</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/codegen</outputDirectory> <resources> <resource> <directory>src/main/codegen</directory> <filtering>false</filtering> </resource> </resources> </configuration> </execution> </executions> </plugin> <plugin> <groupId>com.googlecode.fmpp-maven-plugin</groupId> < artifactId > FMPP -- maven plugin < / artifactId > < version > 1.0 < / version > < dependencies > < the dependency > < the groupId > org. Freemarker < / groupId > < artifactId > freemarker < / artifactId > < version > 2.3.28 < / version > < / dependency > </dependencies> <executions> <execution> <id>generate-fmpp-sources</id> <phase>generate-sources</phase> <goals> <goal>generate</goal> </goals> <configuration> <cfgFile>${project.build.directory}/codegen/config.fmpp</cfgFile> <outputDirectory>target/generated-sources</outputDirectory> <templateDirectory>${project.build.directory}/codegen/templates</templateDirectory> </configuration> </execution> </executions> </plugin>Copy the code
16
5.2.3 requires. Generated by javacc parser
Using javacc generates Parser from the parser.jj file.
<plugin> <! -- This must be run AFTER the fmpp-maven-plugin --> <groupId>org.codehaus.mojo</groupId> < artifactId > javacc -- maven plugin < / artifactId > < version > 2.4 < / version > < executions > < execution > <phase>generate-sources</phase> <id>javacc</id> <goals> <goal>javacc</goal> </goals> <configuration> <sourceDirectory>${project.build.directory}/generated-sources/</sourceDirectory> <includes> <include>**/Parser.jj</include> </includes> <! -- This must be kept synced with Apache Calcite. --> <lookAhead>1</lookAhead> <isStatic>false</isStatic> <outputDirectory>${project.build.directory}/generated-sources/</outputDirectory> </configuration> </execution> </executions> </plugin>Copy the code
17
5.2.4. Look at the Parser
The resulting Parser was FlinkSqlParserImpl.
18
5.2.5. Blink Planner introduced Flink-SQL-Parser
Blink Planner (Flink-table-planner -blink) packages flink-SQL-parser and Flink-SQL-parser-hive.
35
6. Summary and Outlook
This paper mainly introduces the dependency between Flink SQL and Calcite, and the generation process of Flink SQL Parser. If you think it is helpful for you to understand flink SQL parsing, please click a little love (follow + like + see again).
7. References
www.slideshare.net/JordanHalte…
Arxiv.org/pdf/1802.10…
Changbo. Tech/blog / 7 dec2e…
www.liaojiayi.com/calcite/
www.zhihu.com/column/c\_1…
Blog.csdn.net/QuinnNorris…
www.pianshen.com/article/721…
Matt33.com/2019/03/07/…
www.jianshu.com/p/edf503a2a…
Blog.csdn.net/u013007900/…
Blog.csdn.net/u013007900/…
www.ptbird.cn/optimizatio…
Book.51cto.com/art/201306/…
Book.51cto.com/art/201306/…
Miaowenting. Site / 2019/11/10 /…
Big data sheep said
Use data to increase the probability of good things happening
32 original content
The public,
Phase to recommend
[
Flink SQL know why (5) | custom protobuf format
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (4) | SQL API type system
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (3) | custom redis data pool table (with source)
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (2) | custom redis data dimension tables (with source)
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know how (a) | source/sink principle
] (mp.weixin.qq.com/s?__biz=Mzk…).
More Flink real-time big data analysis related technology blog posts, videos. The background replies flink or FLink SQL.
Click a like + see, thank you for your affirmation 👇Copy the code
This article uses the article synchronization assistant to synchronize