“This is the 28th day of my participation in the First Challenge 2022. For details: First Challenge 2022”

An overview,

  • Flink SQL & TableBackground and Principles
  • The concept of dynamic tables
  • The commonly usedSQLAnd built-in functions

Why do WE need relationshipsAPI

Flink implements batch stream unification through the Table API & SQL.

The Table API and SQL are at the top of the list and are the advanced API operations provided by Flink. Flink SQL is a standard SQL semantic development language designed by Flink real-time computing to simplify the calculation model and lower the threshold for users to use real-time computing.

Rely on:

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>Flink - clients_2. 11</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>Flink - table - API - Java - bridge_2. 11</artifactId>
			<version>1.11.1</version>
		</dependency>

		<! If you need the Scala API, add -->
		<! - < the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - table - API - scala - bridge_2. 11 < / artifactId > The < version > 1.11.1 < / version > < / dependency > -- >

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>Flink - table - planner - blink_2. 11</artifactId>
			<version>1.11.1</version>
		</dependency>
Copy the code

Second, the principle of

Hive is responsible for almost half of offline data processing in offline computing scenarios. Apache Calcite is used for the underlying parsing of SQL

Flink also taught Calcite the parsing, optimization, and execution of SQL.

Apache CalciteperformSQLThe main query steps are as follows:

  1. willSQLParse into an abstract unverified syntax tree (AST.Abstract Syntax Tree). Abstract syntax trees are language-independent forms, similar to the first step in cross-compilation.
  2. ValidateValidation:AST, main validationSQLIf the statement is valid, the result of verification isRelNodeThe tree.
  3. OptimizeOptimization:RelNodeTree and generate a physical execution plan.
  4. Execute: Converts physical execution plans into platform-specific execution code, such asFlinkDataStreamApplication.

As shown in figure:

Both batch query SQL and streaming query SQL are converted into node tree SQLNode Tree by the corresponding converter Parser, and Logical Plan is generated. Logical Plan is optimized to generate physical execution Plan that can be executed. Submit it to the API of the DataSet or DataStream for execution.

A complete Flink Table & SQL Job is also composed of Source, Transformation and Sink:

Dynamic table

Compared with traditional Table SQL query, Flink Table & SQL will be in dynamic data changes every moment when processing stream data, so there is a dynamic Table concept.

The query for a dynamic table is the same as that for a static table, but SQL does not terminate the query for a dynamic table.

For example, the Flink program accepts as input a Kafka stream that records the user’s purchases:

First, Kafka’s messages are constantly parsed into an ever-growing dynamic table, and the SQL we execute on the dynamic table constantly generates new dynamic tables as a result.

The procedure for executing SQL queries on DataStream is as follows:

  1. DataStreamConvert to dynamic tables
  2. Define persistent queries on dynamic tables
  3. Transform the real-time results of continuous queries into dynamic tables
  4. Converts the dynamic table representing the query results toDataStream, so that the application can useDataStream APIFurther transform the query results.

Four,Flink Table & SQLOperators and built-in functions

A simple example:

SELECT * FROM Table;
SELECTName, age,FROM Table;


SELECTName, age,FROM Table where name LIKE Xiao Ming '% %';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)


SELECTName, age,FROM Table where name LIKE Xiao Ming '% %';
SELECT * FROM Table WHERE age = 20;
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)


SELECT *
FROM User LEFT JOIN Product ON User.name = Product.buyer

SELECT *
FROM User RIGHT JOIN Product ON User.name = Product.buyer

SELECT *
FROM User FULL OUTER JOIN Product ON User.name = Product.buyer
Copy the code

window

According to different window data division, Apache Flink currently has the following three types:

  • Scroll window, window data has a fixed size, the data in the window will not stack;

  • Sliding window, window data has a fixed size, and has a generation interval;

  • Session window, window data has no fixed size, according to the parameters passed in by the user, the window data is not superimposed;

1) Scroll window

The scroll window is characterized by fixed size and no overlapping of data in the window, as shown in the figure below:

The syntax is as follows:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)

Copy the code

For example, we need to calculate the number of orders per user per day:

SELECT user, TUMBLE_START(timeLine, INTERVAL '1' DAY) as winStart, SUM(amount) 
FROM Orders 
GROUP BY TUMBLE(timeLine, INTERVAL '1' DAY), user;
Copy the code

TUMBLE_START and TUMBLE_END represent the start time and end time of a window, and timeLine in TUMBLE (timeLine, INTERVAL ‘1’ DAY) represents the column to which the time field belongs. INTERVAL ‘1’ DAY Indicates that the INTERVAL is one DAY.

2) Sliding Windows

Unlike scroll Windows, which have a fixed size, slide Windows control how often they are created using the Slide parameter. It should be noted that data overlap may occur in multiple sliding Windows, with specific semantics as follows:

The syntax of a sliding window has only one more slide parameter than that of a scrolling window:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)

Copy the code

For example, we would calculate the sales of each item over the past 24 hours every hour:

SELECT product, SUM(amount) 
FROM Orders 
GROUP BY HOP(rowtime, INTERVAL '1' HOUR.INTERVAL '1' DAY), product
Copy the code

In the preceding example, INTERVAL ‘1’ HOUR represents the INTERVAL for sliding window generation.

3) Session window

The session window defines an inactive time. If no event or message occurs within the specified interval, the session window closes.

The syntax of a session window is as follows:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
Copy the code

For example, we need to calculate the number of orders per user in the past hour:

SELECT user, SESSION_START(rowtime, INTERVAL '1' HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL '1' HOUR) AS sEnd, SUM(amount) 
FROM Orders 
GROUP BY SESSION(rowtime, INTERVAL '1' HOUR), user
Copy the code