sequence
This article mainly studies flink’s Table API and SQL Programs
The instance
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2",...). ; // or tableEnv.registerExternalCatalog("extCat",...). ; // register an output Table tableEnv.registerTableSink("outputTable",...). ; // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...) ; // create a Table from a SQL query Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");
// execute
env.execute();
Copy the code
- This example shows the basic usage of FLink’s Table API and SQL Programs
Table API instance
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// emit or convert Table
// execute query
Copy the code
- Create a Table using the tableenv. scan method, and then use the various query apis of the Table
This instance
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
Copy the code
- SqlQuery is implemented internally using Apache Calcite
SqlUpdate instance (TableSink
)
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// execute query
Copy the code
- After TableSink is used to register the output table, you can use the sqlUpdate method of TableEnvironment to sink to the output table
InsertInto instance (TableSink
)
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schema
String[] fieldNames = {"a"."b"."c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
// execute the program
Copy the code
- Sink into the output Table using the table. insertInto method
DataStream(Or the DataSet
) and Table
Register DataStream as Table
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as Table "myTable" with fields "f0"."f1"
tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong"."myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
Copy the code
- Through StreamTableEnvironment. RegisterDataStream registered DataStream of Table
DataStream Indicates the Table instance
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0"."f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong"."myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
Copy the code
- Here by StreamTableEnvironment. FromDataStream will turn Table DataStream
Table To the DataStream instance
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
Copy the code
- Here by StreamTableEnvironment. ToRetractStream convert Table to DataStream
Table Indicates the DataSet instance
// get BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
Copy the code
- Here by BatchTableEnvironment. ToDataSet converts the Table to the DataSet
Data Types are mapped to Table Schema
Position-based Mapping(The Tuple type
)
// get a StreamTableEnvironment, works forBatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //-- Tuple type -- DataStream<Tuple2<Long, Integer>> stream =... // convert DataStream into Table with default field names"f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
Copy the code
- Position-based mapping requires that the newly specified field name cannot be the same as the input data type. If the field name is not specified, the original type is named starting from f0 by default. This mode applies to Tuple and Row, but not poJOs
Name-based Mapping(POJO type
)
// get a StreamTableEnvironment, works forBatchTableEnvironment equivalently StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); //-- Tuple type -- DataStream<Tuple2<Long, Integer>> stream =... // convert DataStream into Table with default field names"f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");
// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong"); // Person is a POJO with fields"name" and "age"
DataStream<Person> stream = ...
// convert DataStream into Table with default field names "age"."name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed fields "myAge"."myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
Copy the code
- Tuple or POJO types can use this pattern, and can also be aliased using AS
Atomic type
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Long> stream = ...
// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
Copy the code
- The original type is converted to a single field
Row type
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...
// convert DataStream into Table with default field names "name"."age"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed field names "myName"."myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// convert DataStream into Table with renamed fields "myName"."myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
Copy the code
- The Row type supports any number of fields and allows the field value to be null. It can use position-based Mapping and name-based Mapping
summary
Flink Table API and SQL Programs basic usage
- The first is to create the TableEnvironment(
BatchTableEnvironment or StreamTableEnvironment
), and then create a Table or TableSource and register it with catalog(The default catalog is internal, or you can register the External Catalog yourself
), and then perform a query of the table, followed by some conversion operations - Table creation can be converted from DataSet and DataStream. A query about a Table can be made using API query(
Scan method
), or you can use SQL Query (This method
), or a mixture of them - You can also convert the queried Table to DataSet or DataStream for other processing. If the output is also output to a table, TableSink can be registered and output to TableSink using the sqlUpdate method of TableEnvironment or the insertInto method of table
doc
- Table API & SQL Concepts & Common API