“This is the fifth day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.
Flink SQL is a standard SQL semantic development language designed by Flink real-time computing to simplify the calculation model and lower the threshold for users to use real-time computing. Since 2015, Alibaba began to investigate the open source computing engine, and finally decided to build a new generation of computing engine based on Flink, optimize and improve the shortcomings of Flink, and open source the final code in early 2019, which is also known as Blink. One of Blink’s most notable contributions to the original Flink is the implementation of Flink SQL.
Flink SQL is a user-oriented API layer. In our traditional Streaming computing domain, such as Storm and Spark Streaming, there are some functions or Datastream apis that users can write business logic in Java or Scala. While this approach is flexible, it has some disadvantages, such as a certain threshold and difficulty in tuning, as well as many incompatibilities in the API as versions are updated.
In this context, THERE is no doubt that SQL is the best choice for us. We chose SQL as our core API because it has several very important characteristics:
-
SQL is a set language, users only need to express clear needs, do not need to understand specific practices;
-
SQL can be optimized with multiple query optimizers built in that translate the optimal execution plan for SQL;
-
SQL is easy to understand, understood by people in different industries and fields, and low cost to learn;
-
SQL is very stable, and SQL itself has changed little in the database’s 30-plus year history;
-
Flow and batch unification, Flink bottom Runtime itself is a flow and batch unification engine, and SQL can achieve API layer flow and batch unification.
1. Flink Common SQL operators
SELECT:
SELECT is used to SELECT data from the DataSet/DataStream and filter out some columns.
Example:
SELECT * FROM Table; // Retrieve all columns in the table
SELECT name, age FROM Table; // Select name and age from table
At the same time, functions and aliases can be used in SELECT statements, such as WordCount we mentioned above:
SELECT word, COUNT(word) FROM table GROUP BY word;
WHERE:
WHERE is used to filter data from a data set/stream and is used in conjunction with SELECT to split relationships horizontally based on certain criteria, that is, to SELECT records that meet the criteria.
Example:
SELECT name, age FROM Table where name LIKE '% xiaoming %';
SELECT * FROM Table WHERE age = 20;
Flink SQL also supports the combination of =, <, >, <>, >=, <=, AND, OR expressions in the WHERE condition. Finally, the data that meets the filtering condition will be selected. And WHERE can be used together with IN and NOT IN. Here’s an example:
SELECT name, age
FROM Table
WHERE name IN (SELECT name FROM Table2)
Copy the code
DISTINCT:
DISTINCT is used to de-duplicate data sets/streams based on the results of the SELECT.
Example:
SELECT DISTINCT name FROM Table;
For streaming queries, the number of states required to calculate the query results can grow indefinitely, and users need to control the State range of the query themselves to prevent excessive State.
GROUP BY:
GROUP BY is used to GROUP data. For example, we need to calculate the total score of each student in the score sheet.
Example:
SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
UNION and UNION ALL:
UNION is used to join two result sets, requiring the fields of the two result sets to be exactly the same, including the field type and field order. Unlike a UNION ALL, a UNION deduplicates the result data.
Example:
SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
The JOIN:
JOIN is used to JOIN data from two tables to form a result table. The JOIN types supported by Flink include:
JOIN - INNER JOIN
LEFT JOIN - LEFT OUTER JOIN
RIGHT JOIN - RIGHT OUTER JOIN
FULL JOIN - FULL OUTER JOIN
The semantics of the JOIN here are the same as the JOIN semantics we use in relational databases.
Example:
JOIN (associate order table data with item table)
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
The difference between LEFT JOIN and JOIN is that when the RIGHT table has no data to JOIN with the LEFT table, the corresponding field on the RIGHT will be NULL output, and the RIGHT JOIN is equivalent to the LEFT JOIN two tables to interact with each other. FULL JOIN is equivalent to UNION ALL after RIGHT JOIN and LEFT JOIN.
Example:
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id
SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id
Copy the code
The Group Window:
According to the differences in Window data division, Apache Flink has the following three kinds of Bounded Windows:
Tumble: Window data has a fixed size and does not add data to it.
Hop, sliding window, window data has fixed size, and there is a fixed window reconstruction frequency, window data has superposition;
Session: Window data does not have a fixed size. Windows are divided according to the activity level of window data. Window data is not superimposed.
Tumble Window:
The Tumble scroll window has a fixed size and does not overlap window data. The semantics are as follows:
The syntax for Tumble scroll Windows is as follows:
SELECT
[gk],
[TUMBLE_START(timeCol, size)],
[TUMBLE_END(timeCol, size)],
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
Copy the code
Among them:
[GK] determines whether aggregation by field is required;
TUMBLE_START indicates the window start time;
TUMBLE_END indicates the window end time.
TimeCol is a time field in the flow table.
Size indicates the window size, such as seconds, minutes, hours, and days.
For example, if we want to calculate the daily order volume of each person, we can aggregate the orders by user:
SELECT user,
TUMBLE_START(rowtime, INTERVAL ‘1'DAY) as wStart,
SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL ‘1'DAY), user;
Copy the code
Hop Window:
The Hop slide window is similar to the scroll window in that the window has a fixed size. Unlike the scroll window, the slide window can control the new frequency of the slide window through the slide parameter. Therefore, when the slide value is smaller than the value of the window size, multiple sliding Windows will overlap, with specific semantics as follows:
The syntax of the Hop sliding window is as follows:
SELECT
[gk],
[HOP_START(timeCol, slide, size)] ,
[HOP_END(timeCol, slide, size)],
agg1(col1),
...
aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
Copy the code
Each field means something similar to the Tumble window:
[GK] determines whether aggregation by field is required;
HOP_START indicates the window start time.
HOP_END indicates the window end time.
TimeCol indicates the time field in the flow table.
Slide indicates the size of each window slide;
Size indicates the size of the entire window, such as seconds, minutes, hours, and days.
For example, we calculate the sales volume of each item in the past 24 hours every hour:
SELECT product,
SUM(amount)
FROM Orders
GROUP BY HOP(rowtime, INTERVAL '1' HOUR.INTERVAL '1' DAY), product
Copy the code
The Session Window:
Session time Windows have no fixed duration, but they are bounded by interval inactivity, which means that if no event occurs during the defined interval, the session window closes.
Seeeion The syntax for a session window is as follows:
SELECT
[gk],
SESSION_START(timeCol, gap) AS winStart,
SESSION_END(timeCol, gap) AS winEnd,
agg1(col1),
...
aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
Copy the code
[GK] determines whether aggregation by field is required;
SESSION_START indicates the window start time.
SESSION_END indicates the window end time.
TimeCol indicates the time field in the flow table.
Gap indicates the length of the window data inactive cycle.
For example, we need to calculate the number of orders within 12 hours of each user’s access time:
SELECT user,
SESSION_START(rowtime, INTERVAL ‘12'HOUR) AS sStart,
SESSION_ROWTIME(rowtime, INTERVAL ‘12'HOUR) AS sEnd,
SUM(amount)
FROM Orders
GROUP BY SESSION(rowtime, INTERVAL ‘12'HOUR), user
Copy the code
The Table API and SQL are bundled in the Flink-Table Maven artifact. You must add the following dependencies to your project to use the Table API and SQL:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>Flink - table_2. 11</artifactId>
<version>${flink.version}</version>
</dependency>
Copy the code
In addition, you need to add dependencies to Flink’s Scala batch or streaming API. For batch queries, you need to add:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>Flink - scala_2. 11</artifactId>
<version>${flink.version}</version>
</dependency>
Copy the code
2. Flink SQL practice case
1) Batch data SQL
Usage:
- Build the Table running environment
- Register the DataSet as a table
- Use the sqlQuery method of the Table runtime environment to execute SQL statements
Example: Flink SQL is used to count the total amount, maximum amount, minimum amount, and total number of orders.
The order id | The user name | Order date | Consumption amount |
---|---|---|---|
1 | Zhangsan | The 2018-10-20 soon | 358.5 |
Test data (order ID, user name, order date, order amount) :
Order(1, "zhangsan", "2018-10-20 16:30", 358.5), Order(2, "zhangsan", "2018-10-20 16:30", 131.5), The ticket is 2018-10-20, 127.5), the Order (4, "lisi", "the 2018-10-20 ticket", 328.5), the Order (5, "lisi", "the 2018-10-20 ticket", 432.5). Order(6, "zhaoliu", "2018-10-20 22:30", 362.0), Order(8, "zhaoliu", "2018-10-20 22:30", 362.0), "2018-10-20 22:30", 364.0), Order(9, "2018-10-20 22:30", 341.0)Copy the code
Steps:
- Get a batch run environment
- Get a Table runtime environment
- Create a sample class Order to map data (Order name, username, Order date, Order amount)
- Create a DataSet source based on the local Order collection
- Use the Table runtime environment to register the DataSet as a Table
- Use SQL statements to manipulate data (count total amount, maximum amount, minimum amount, total amount of orders consumed by users)
- Use tableenv. toDataSet to convert Table toDataSet
- Print test
Sample code:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.{Table.TableEnvironment}
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
/** * Use Flink SQL to count the total amount, maximum amount, minimum amount and total number of orders. * /
object BatchFlinkSqlDemo {
// create a sample class Order to map data (Order name, user name, Order date, Order amount)
case class Order(id:Int, userName:String, createTime:String, money:Double)
def main(args: Array[String) :Unit = {
/** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** * Create a sample class Order to map data (Order name, user name, Order date, Order amount) * 4. Create a DataSet source based on the local Order * 5. Use the Table runtime environment to register the DataSet as a Table * 6. Use SQL statements to manipulate data (count the total amount, maximum amount, minimum amount and total number of orders consumed by users) * 7. Convert Table toDataSet * 8 using tableenv. toDataSet Print test */
//1. Obtain a batch runtime environment
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//2. Obtain a Table running environment
val tabEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
// create a DataSet source based on the local Order collection
val orderDataSet: DataSet[Order] = env.fromElements(
Order(1."zhangsan"."Hold" 2018-10-20.358.5),
Order(2."zhangsan"."The 2018-10-20 ticket".131.5),
Order(3."lisi"."The 2018-10-20 ticket".127.5),
Order(4."lisi"."The 2018-10-20 ticket".328.5),
Order(5."lisi"."The 2018-10-20 ticket".432.5),
Order(6."zhaoliu"."The 2018-10-20 22:30".451.0),
Order(7."zhaoliu"."The 2018-10-20 22:30".362.0),
Order(8."zhaoliu"."The 2018-10-20 22:30".364.0),
Order(9."zhaoliu"."The 2018-10-20 22:30".341.0))//5. Use the Table runtime environment to register the DataSet as a Table
tabEnv.registerDataSet("t_order", orderDataSet)
//6. Use SQL statement to manipulate data (count total amount, maximum amount, minimum amount, total number of orders)
// Total amount, maximum amount, minimum amount and total number of orders.
val sql =
""" | select | userName, | sum(money) totalMoney, | max(money) maxMoney, | min(money) minMoney, | count(1) totalCount | from t_order | group by userName |""".stripMargin / / in scala stripMargin default is "|" as a multi-line connector
//7. Use tableenv. toDataSet to convert Table toDataSet
val table: Table = tabEnv.sqlQuery(sql)
table.printSchema()
tabEnv.toDataSet[Row](table).print()
}
}
Copy the code
2) Stream data SQL
SQL can also be supported in stream processing. However, the following points should be noted:
- To use streamed SQL, you must add a watermark time
- When using the registerDataStream registry, ‘is used to specify fields
- When registering a table, you must specify a RowTime, otherwise you cannot use Windows in SQL
- Must want to import the import org. Apache. Flink. Table. API. Scala. _ implicit parameter
- SQL uses trumble(time column name, interval ‘time’ sencond) to define the window
Example: Use Flink SQL to count the total number of orders, the maximum amount of orders, and the minimum amount of orders from users within 5 seconds.
steps
- Gets the flow processing runtime environment
- Obtain the Table operating environment
- Set the processing time to EventTime
- Create an Order sample class Order with four fields (Order ID, user ID, Order amount, and timestamp)
- Create a custom data source
- Generate 1000 orders using the for loop
- Randomly generate order IDS (UUID)
- Random generation of user IDS (0-2)
- Randomly generated order amount (0-100)
- The timestamp is the current system time
- An order is generated every 1 second
- Add watermark, allow a delay of 2 seconds
- The import import org. Apache. Flink. Table. API. Scala. _ implicit parameter
- Use the registerDataStream registry and specify the fields separately, as well as the RowTime field
- Use Tumble (interval ‘window time’ second) to create Windows when writing SQL statements that count total user orders, maximum, and minimum order groups
- Execute the SQL statement with tableenv.sqlQuery
- Convert the SQL execution result to DataStream and print it
- Start the stream handler
Sample code:
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction.SourceFunction}
import org.apache.flink.streaming.api.scala.{DataStream.StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table.TableEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.types.Row
import scala.util.Random
** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** ** *
object StreamFlinkSqlDemo {
/** * 1. Obtain the flow processing environment * 2. Obtain the Table running environment * 3. Create an Order sample class Order with four fields (Order ID, user ID, Order amount, timestamp) * 5. Create a custom data source * Generate 1000 orders using a for loop * randomly generate order ID (UUID) * randomly generate user ID (0-2) * Randomly generate order amount (0-100) * Timestamp to current system time * Generate an order every 1 second * 6. The watermark, allowed to delay 2 seconds * 7. Import import org. Apache. Flink. Table. API. Scala. * 8 _ implicit parameter. Use the registerDataStream registry and specify fields separately, as well as the RowTime field * 9. Use Tumble (Interval 'window time' second) to create a window * 10 when writing A SQL statement to count total user orders, maximum value, minimum value * groups. Select tableenv.sqlquery from tableenv.sqlquery Convert SQL execution results to DataStream and print them * 12. Start the stream handler */
// create an Order sample class 'Order' with four fields (Order ID, user ID, Order amount, timestamp)
case class Order(orderId:String, userId:Int, money:Long, createTime:Long)
def main(args: Array[String) :Unit = {
// 1. Create the flow processing runtime environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 2. Set processing time to 'EventTime'
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Get the operating environment of the table
val tableEnv = TableEnvironment.getTableEnvironment(env)
// create a custom data source
val orderDataStream = env.addSource(new RichSourceFunction[Order] {
var isRunning = true
override def run(ctx: SourceFunction.SourceContext[Order) :Unit = {
// - Generate order ID (UUID) randomly
// - Generate user ID randomly (0-2)
// - Randomly generated order amount (0-100)
// - The timestamp is the current system time
// - Generates an order every 1 second
for (i <- 0 until 1000 if isRunning) {
val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101),
System.currentTimeMillis())
TimeUnit.SECONDS.sleep(1)
ctx.collect(order)
}
}
override def cancel() :Unit = { isRunning = false}})// 5. Add watermark, allow 2 seconds delay
val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Order] (Time.seconds(2)) {
override def extractTimestamp(element: Order) :Long = {
val eventTime = element.createTime
eventTime
}
}
)
/ / 6. Import ` import org. Apache. Flink. Table. API. Scala. _ ` implicit parameter
// 7. Use the 'registerDataStream' registry and specify fields separately, as well as rowTime fields
import org.apache.flink.table.api.scala._
tableEnv.registerDataStream("t_order", watermarkDataStream, 'orderId.'userId.'money.'createTime.rowtime)
// 8. Compile SQL statements to count the total amount, maximum amount and minimum amount of user orders
// - Create Windows using 'tumble(time column, interval' window time 'second) when grouping
val sql =
""" |select | userId, | count(1) as totalCount, | max(money) as maxMoney, | min(money) as minMoney | from | t_order | group by | tumble(createTime, interval '5' second), | userId """.stripMargin
// 9. Use 'tableenv.sqlquery' to execute SQL statement
val table: Table = tableEnv.sqlQuery(sql)
// 10. Convert the SQL execution result to DataStream and print it
table.toRetractStream[Row].print()
env.execute("StreamSQLApp")}}Copy the code