Thank you for your little love (attention + like + look again), the affirmation of the blogger, will urge the blogger to continue to output more quality actual combat content!!
1. Preface – Structure of this paper
After so much preparation in the previous chapters, this section finally comes up with a Query.
The datastream API is familiar to most of you. Once again, in datastream, the logic of the code you write is how it will ultimately be executed.
However, we are not familiar with the execution process of Flink SQL.
So this article takes you into the world of Flink SQL Query logic by using ETL, Group AGG (sum, count, etc.) simple aggregate class Query in the following sections. To at least familiarize you with what flink SQL is doing when it’s running.
-
Background – What do you not know about Flink SQL?
-
Objective – What can this article help you understand about Flink SQL?
-
Real Life – A simple Query case and how it works
-
Summary and outlook
First, the conclusion:
-
Scenario problem: Flink SQL is well suited for simple ETL, as well as aggregation-class metrics for almost all scenarios.
-
Syntax problem: Flink SQL syntax is basically the same as other SQL syntax. There are few syntax problems that hinder the use of Flink SQL.
-
Run questions: Some tips for viewing Flink SQL tasks:
-
Go to the Flink webui to see what the task is currently doing. Including the operator name will directly show us which operator is doing what, what logic is being handled.
-
If you want to know what code your Flink task executes, take a look at what happens in the SQL final transformation. Flink SQL generated code is also in there.
-
If you’re not sure how online tasks work, try running them locally.
2. Background – What do you not know about Flink SQL?
First of all, we use flink SQL with an original intention and state of departure, think about you at the beginning of flink SQL, what is an idea?
The blogger has roughly sorted out the following questions or thoughts in the initial process of flink SQL:
-
Scenario question: First of all, flink SQL is used to improve performance. What scenarios are better for Flink SQL to do than datastream?
-
Syntax question: Will flink SQL syntax be different from other SQL syntax when I write SQL?
-
Run question: I write a SQL and it runs, but it’s black box to me. How do I know what the task is doing? Is there a good way for me to understand how Flink SQL works?
-
Misunderstanding: What are some misconceptions about how FLink SQL works?
-
Pits: What pits do Flink SQL typically have? Knowing in advance helps us avoid potholes.
It is these ideas that will make many students shy away from introducing Flink SQL internally.
3. Objective – What does this article help you understand about Flink SQL?
Take a look at the objectives of this article:
-
Scenario question: To help you understand which scenarios are best suited to Flink SQL
-
Syntax questions: To help you get familiar with flink SQL syntax
-
Run the problem: Use a simple Query SQL to see how it works and how it works
-
Understanding pitfalls: Common pitfalls in computing mechanisms
-
Pits: See what pits SQL typically has
Since one article cannot cover all the concepts, this article focuses on some of the simplest ETL, aggregation scenarios, focusing on the first three.
The latter two points will be elaborated on scenarios in future articles.
4. Actual Combat – Simple Query cases and operating principles
4.1. Scenario Question: What scenarios are suitable for Flink SQL?
No, I confess, flink SQL is really good for DWD cleaning, DWS aggregation.
This is mainly for the real-time warehouse scenario. Flink SQL can do DWD cleaning, DWS aggregation, basically real-time data warehouse most scenarios can be covered.
Flink SQL is awesome!!
But!!
My experience with Flink SQL shows that not all DWD and DWS aggregation scenarios are suitable for Flink SQL (as of the post stage)!!
In fact, these scenarios that are not currently suitable for Flink SQL can be summed up as a loss of processing over datastream.
First, summarize the usage scenarios:
1. DWD: simple cleaning, complex cleaning, dimension expansion, and use of various UDFs
2. DWS: Various types of aggregation
Then, it is divided into suitable scenes and unsuitable scenes. Since this article cannot cover all the content, this paper first gives a general conclusion here, and then describes the specific scenes in detail.
- Suitable scene:
-
Simple DWD cleaning scenario
-
DWS aggregation scenario for all scenarios
- Scenarios that are not currently suitable:
-
Complex DWD cleaning scenarios: Examples include using a lot of UDF cleaning, especially using a lot of JSON class parsing cleaning
-
Associative dimension scenario: For example, datastream often stores a batch of data to access external interfaces in batches. Although Flink SQL has localcache and asynchronous access capabilities for this scenario, it still accesses external caches one by one, resulting in performance gaps compared with batch access.
4.2. Syntax \ runtime issues
In fact, in summary, for students who have been exposed to SQL, except for flink SQL window aggregation class writing method, other SQL syntax is the same, it is easy to understand.
This section introduces specific cases in detail.
2. The ETL
Simplest ETL type task.
SELECT select_list FROM table_expression [ WHERE boolean_expression ]
Copy the code
1. Scenario: Simple DWD cleaning and filtering scenario
Even the most suitable for FLink SQL ETL and group AGG scenarios have not seen it.
Data source table:
CREATE TABLE source_table (order_number BIGINT, price DECIMAL(32,2)) WITH ('connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.min' = '10', 'fields.order_number.max' = '11' )Copy the code
Data pool table:
CREATE TABLE sink_table (order_number BIGINT, price DECIMAL(32,2)) WITH ('connector' = 'print')Copy the code
ETL logic:
insert into sink_table
select * from source_table
where order_number = 10
Copy the code
2. Run: As you can see, in fact, in the FLink SQL task, it will write the corresponding processing logic to the operator name.
Flink SQL Tips 1:
This is actually our first tip for looking at the Flink SQL task. If you want to know what your Flink task is doing, your first reaction is to go to the Flink webui and see what the task is currently doing. Including the operator name will directly show us which operator is doing what, what logic is being handled
5
Results 3.
+ I + I [10, 337546916355686018150362513408.00] [10, 734895198061906189720381030400.00] + [10, I I [496632591763800912960818249728.00] + 10, 495090465926828588045441171456.00] + [10, I I [167305033642317182838130081792.00] + 10, 409466913112794578407573684224.00] + [10, I I [894352160414515330502514180096.00] + 10, 680063350384451712068576346112.00] + [10, I I [50807402446574997641386524672.00] + 10, 646597093362022945955245981696.00] + [10, I 233317961584082024331537809408.00]...Copy the code
Principle: 4.
Take a look at the entry execution logic of a Flink SQL task.
Let’s start by looking at the difference in logic between the execution of a table building statement and the execution of a Query statement.
You can see that when executed to executeInternal different operations are performed for specific operations.
Perform CreateTableOperation to store the table information to the catalogManager.
8
The query operation, which is the specific ModifyOperation, converts the corresponding logic to the corresponding Transformation.
9
Transformation contains the overall logic of the execution and the content of the SQL code to be executed.
10
Let’s take a closer look at what’s in the corresponding Transform.
First, the outermost LegacySinkTransformation, namely the Sink operator, is print Sink Function. It’s easier to understand.
11
Then there is the middle layer OneInputTransformation, which is the filtering and transformation operation in SQL (select * from source_TABLE where order_number = 10).
12
The generated code is in the Code field in the GeneratedOperator. We copy the corresponding code into a new folder.
13
This operator directly inherits the OneInputStreamOperator to execute the logic directly, skipping datastream.
14
Let’s look at the most important processElement logic, as shown in the figure for the field interpretation and execution logic.
Notes – Observing Flink SQL Tips 2:
This is actually the second tip we use to look at the Flink SQL task. If you want to know what code your Flink task executes, take a look at what happens in the SQL final transformation.
4.2.2. De-replay scenes
1. Scene: The simplest de-replay scene
Even the most suitable for FLink SQL ETL and group AGG scenarios have not seen it.
Data sources:
CREATE TABLE source_table (
string_field STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.string_field.length' = '3'
)
Copy the code
Data transfer:
CREATE TABLE sink_table (
string_field STRING
) WITH (
'connector' = 'print'
)
Copy the code
Data processing:
insert into sink_table
select distinct string_field
from source_table
Copy the code
2. Run: As you can see, in fact, in the FLink SQL task, it will write the corresponding processing logic to the operator name.
17
3. The result of the above case:
+I[cd3]
+I[8fc]
+I[b0c]
+I[1d8]
+I[e28]
+I[c5f]
+I[e7d]
+I[dfa]
+I[1fe]
...
Copy the code
Principle: 4.
Here we focus only on the logic that differs from the above.
The first is the KeyGroupStreamPartitioner PartitionTransform, is the corresponding partition logic. Let’s look at the logic for generating the code.
In shuffle logic, string_field is used as the key to shuffle.
The second, KeyedProcessOperator in OneInputTransformation, is the corresponding undo logic.
You can see that these are the only three pieces of code in the generated function that are business logic code, but the RowData initialization sizes are all 0. So where does this go?
24
We’ll see if we follow the processing logic. The deduplication logic focuses on group Function#processElement.
25
4.2.3. Group aggregation scenario
4.2.3.1. Simple Aggregation Scenario
1. Scenario: The simplest aggregation scenario
Even the most suitable for FLink SQL ETL and group AGG scenarios have not seen it.
Count, sum, AVg, Max, min, etc:
Data sources:
CREATE TABLE source_table (
order_id STRING,
price BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.order_id.length' = '1',
'fields.price.min' = '1',
'fields.price.max' = '1000000'
)
Copy the code
Data transfer:
CREATE TABLE sink_table (
order_id STRING,
count_result BIGINT,
sum_result BIGINT,
avg_result DOUBLE,
min_result BIGINT,
max_result BIGINT
) WITH (
'connector' = 'print'
)
Copy the code
Data processing logic:
insert into sink_table
select order_id,
count(*) as count_result,
sum(price) as sum_result,
avg(price) as avg_result,
min(price) as min_result,
max(price) as max_result
from source_table
group by order_id
Copy the code
2. Run:
3. The result of the above case:
+I[1, 1, 415300, 415300.0, 415300, 415300] +I[D, 1, 416878, 416878.0, 416878, 416878] +I[0, 1, 120837, 120837.0, +I[c, 1, 337749, 337749.0, 337749, 337749] +I[7, 1, 387053, 387053.0, 387053, 387053] +I[8, 1, 387042, 387042.0, 387042, 387042] +I[2, 1, 546317, 546317.0, 546317, 546317] +I[e, 1, 22131, 22131.0, 22131, 22131] +I[9, 1, 651731, 651731.0, 651731, 651731] -u [0, 1, 120837, 120837.0, 120837, 120837] +U[0, 2, 566664, 283332.0, 120837] 445827] +I[b, 1, 748659, 748659.0, 748659, 748659] -u [7, 1, 387053, 387053.0, 387053, 387053] +U[7, 2, 1058056, 529028.0, 387053, 671003]Copy the code
Principle: 4.
Take a look at Transformation.
28
Same logic as before, same logic as GroupAggFunction. As shown below, there are five execution steps to perform the calculation.
33
Look at the resulting function code logic.
31
29
So let’s see what count is.
30
How do I calculate sum?
32
4.2.3.2. De-aggregation scenario
1. Scenario: The scenario is de-aggregated
Data sources:
CREATE TABLE source_table (
dim STRING,
user_id BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
)
Copy the code
Data transfer:
CREATE TABLE sink_table (
dim STRING,
uv BIGINT
) WITH (
'connector' = 'print'
)
Copy the code
Data processing:
insert into sink_table
select dim,
count(distinct user_id) as uv
from source_table
group by dim
Copy the code
2. Run:
35
3. The result of the above case:
+U[9, 3097]
-U[a, 3054]
+U[a, 3055]
-U[8, 3030]
+U[8, 3031]
-U[4, 3137]
+U[4, 3138]
-U[6, 3139]
+U[6, 3140]
-U[0, 3082]
+U[0, 3083]
Copy the code
Principle: 4.
I’m just going to look at the differences from the previous cases.
4.2.3.3. Syntactic sugar
1.grouping sets
Multidimensional computing. Like syntactic sugar, users can specify the desired combination of dimensions according to their own scenarios.
Data transfer:
CREATE TABLE sink_table (
supplier_id STRING,
product_id STRING,
total BIGINT
) WITH (
'connector' = 'print'
)
Copy the code
Data processing logic:
insert into sink_table
SELECT
supplier_id,
product_id,
COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SETS ((supplier_id, product_id), (supplier_id), ())
Copy the code
The result is equivalent to:
insert into sink_table
SELECT
supplier_id,
product_id,
COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY supplier_id, product_id
UNION ALL
SELECT
supplier_id,
cast(null as string) as product_id,
COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY supplier_id
UNION ALL
SELECT
cast(null as string) AS supplier_id,
cast(null as string) AS product_id,
COUNT(*) AS total
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
Copy the code
The results are as follows:
+I[supplier1, product1, 1]
+I[supplier1, null, 1]
+I[null, null, 1]
+I[supplier1, product2, 1]
-U[supplier1, null, 1]
+U[supplier1, null, 2]
-U[null, null, 1]
+U[null, null, 2]
+I[supplier2, product3, 1]
+I[supplier2, null, 1]
-U[null, null, 2]
+U[null, null, 3]
+I[supplier2, product4, 1]
-U[supplier2, null, 1]
+U[supplier2, null, 2]
-U[null, null, 3]
+U[null, null, 4]
Copy the code
Grouping sets allows you to reduce redundant code in multi-dimensional scenarios. The grouping sets principle will be described in a follow-up series.
38
2.rollup
Rollup is a simplified form of rollup computation. GROUPING SETS ((supplier_id, product_id), (supplier_id), ()) can be simplified to ROLLUP (supplier_id, product_id).
Data transfer:
CREATE TABLE sink_table (
supplier_id STRING,
product_id STRING,
total BIGINT
) WITH (
'connector' = 'print'
)
Copy the code
Data processing logic:
SELECT supplier_id, rating, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY ROLLUP (supplier_id, product_id)
Copy the code
The result is equivalent to:
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id ),
( supplier_id ),
( )
)
Copy the code
The results are as follows:
+I[supplier1, product1, 1]
+I[supplier1, null, 1]
+I[null, null, 1]
+I[supplier1, product2, 1]
-U[supplier1, null, 1]
+U[supplier1, null, 2]
-U[null, null, 1]
+U[null, null, 2]
+I[supplier2, product3, 1]
+I[supplier2, null, 1]
-U[null, null, 2]
+U[null, null, 3]
+I[supplier2, product4, 1]
-U[supplier2, null, 1]
+U[supplier2, null, 2]
-U[null, null, 3]
+U[null, null, 4]
Copy the code
39
5. The CUBE is calculated
Even the most suitable for FLink SQL ETL and group AGG scenarios have not seen it.
Cube is equivalent to a combinatorial aggregation computation covering all dimensions. Group by a, B, C. It will combine all the dimensions of A, B and C into group by.
Data transfer:
CREATE TABLE sink_table (
supplier_id STRING,
product_id STRING,
total BIGINT
) WITH (
'connector' = 'print'
)
Copy the code
Data processing logic:
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY CUBE (supplier_id, product_id)
Copy the code
It is equivalent to
SELECT supplier_id, rating, product_id, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id ),
( supplier_id ),
( product_id ),
( )
)
Copy the code
The results are as follows:
+I[supplier1, product1, 1]
+I[supplier1, null, 1]
+I[null, product1, 1]
+I[null, null, 1]
+I[supplier1, product2, 1]
-U[supplier1, null, 1]
+U[supplier1, null, 2]
+I[null, product2, 1]
-U[null, null, 1]
+U[null, null, 2]
+I[supplier2, product3, 1]
+I[supplier2, null, 1]
+I[null, product3, 1]
-U[null, null, 2]
+U[null, null, 3]
+I[supplier2, product4, 1]
-U[supplier2, null, 1]
+U[supplier2, null, 2]
+I[null, product4, 1]
-U[null, null, 3]
+U[null, null, 4]
Copy the code
40
5. Summary and Outlook
This paper mainly introduces some common scenarios of ETL and GROUP AGG aggregation indicators and their underlying operating principles. Flink SQL has the same syntax as Hive SQL, mysql, etc. So when it comes to flink SQL, syntax is hardly a hindrance.
It also introduces some tips for viewing flink SQL tasks:
-
Go to the Flink webui to see what the task is currently doing. Including the operator name will directly show us which operator is doing what, what logic is being handled.
-
If you want to know what code your Flink task executes, take a look at what happens in the SQL final transformation.
Future articles will continue to cover Flink SQL window aggregation, some misconceptions, and cases of pits.
I hope you will continue to pay attention. Support bloggers. If you like it, please follow + like + watch again.
Phase to recommend
[
Flink SQL know why (6) | flink SQL date calcite (see this article will be enough)
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (5) | custom protobuf format
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (4) | SQL API type system
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (3) | custom redis data pool table (with source)
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know why (2) | custom redis data dimension tables (with source)
] (mp.weixin.qq.com/s?__biz=Mzk…).
[
Flink SQL know how (a) | source/sink principle
] (mp.weixin.qq.com/s?__biz=Mzk…).
More Flink real-time big data analysis related technology blog posts, videos. The background replies flink or FLink SQL.
Click a like + see, thank you for your affirmation 👇Copy the code
This article uses the article synchronization assistant to synchronize