Table API and SQL are more advanced apis provided by Flink than DataStream. They encapsulate DataStream and have the following advantages:

  • Support SQL
  • The Table API has much simpler programming
  • Since Flink has made some optimizations to the Table API, it will be more efficient when using the Table API
  • The Table API is intended to support a unified batch integration, compatible with both stream and batch processing.

Table concepts in the Table API

In the Table API and SQL, Flink treats data as a Table, stream data as an infinitely appended Table, and batch data as a finite Table. Since it is a table, it has a specific structure, which can be specified by itself or inferred from data sources.

The hierarchy of tables

The hierarchy of tables in Flink is catalog -> Database -> table.

The catalog can be memory based or external storage systems such as Hive and PostGRE.

  • If it is memory-based, the database and tables in the CATALOG are temporary, and the table metadata is destroyed once the session ends.
  • If based on an external storage system, these tables are persistent tables that can be shared by multiple Flink clusters and can be used in multiple sessions.

Views are also supported in Flink. Views can be created from existing Table objects and are typically used as the result of a Table API/SQL query. Tables are typically used to describe external data, such as files, database tables, or message queues.

The local IDE creates the Table API project

  1. Create a Maven project and add dependencies:
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - table - API - Java - bridge_2. 11</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - table - planner - blink_2. 11</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>Flink - streaming - scala_2. 11</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
</dependencies>
Copy the code
  1. Create Table API ENV
// Mode 1, using setting
EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .inStreamingMode()
        //.inBatchMode()
        .build();

TableEnvironment tEnv = TableEnvironment.create(settings);

/ / 2, use StreamExecutionEnvironment to build table env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// table api and sql to process data , including source, transfrom, sink

//execute job
tEnv.execute();
Copy the code

Converting the Table API to DataStream

DataStream to Table

fromDataStream(DataStream, Expression…)

fromDataStream(DataStream, String)

fromDataStream(DataStream, Schema)

//fromDataStream(DataStream), no field name, field name is f0, f1...
tableEnv.fromDataStream(stream) 
//fromDataStream(DataStream, String) 
tableEnv.fromDataStream(stream, "field-name1, field-name2.sub-name1, field-name3");
//fromDataStream(DataStream, Expression...)  
tableEnv.fromDataStream(stream, $("field-name1"), ${"field-name2.sub-name1") and $("field-name3"));
//fromDataStream(DataStream, Schema)  
tableEnv.fromDataStream(
    dataStream,
    Schema.newBuilder()
        .columnByExpression("proc_time"."PROCTIME()")
        //.columnByMetadata("time1","TIMESTAMP_LTZ(3)")
        / /. The column (" field1 ", "STRING")
        / /. The column (" field2, "" TIMESTAMP_LTZ (3)")
        / /. The column (" f0, "DataTypes. Of the User. The class")
        / /. The column (" f0, "DataTypes. STRUCTURE (User. The class, DataTypes. FIELD (" name", DataTypes. STRING ())) ").
        .watermark("timeField"."timeField - INTERVAL '10' MINUTE")
        //.watermark("timeField", "SOURCE_WATERMARK()")
        .build());

//fromChangelogStream(stream)
//fromChangelogStream(stream,Schema)

// Create a view, just like create a table
tableEnv.createTemporaryView("table-name", stream)
tableEnv.createTemporaryView("table-name", stream, String);
tableEnv.createTemporaryView("table-name", stream, Expression...) ; tableEnv.createTemporaryView("table-name", stream, Schema);

Copy the code

Table to DataStream

//toDataStream(table)
tableEnv.toDataStream(table);
//toDataStream(table, Class)
tableEnv.toDataStream(table, User.class);
//toDataStream(table, AbstractDataType)
tableEnv.toDataStream(table, DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name",DataTypes.STRING())));
//Append table, insert only
tEnv.toAppendStream(table, Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING, Types.SQL_TIMESTAMP));
tEnv.toAppendStream(table, Row.class);
tEnv.toAppendStream(table, new TupleTypeInfo(Types.STRING, Types.INT);
//toChangelogStream(table) supports table updates
tEnv.toChangelogStream(table)  
//toChangelogStream(table,Schema) supports table updates
tEnv.toChangelogStream(table,Schema) 

//Retract table: Update the stream and add a Boolean to it. Delete the Boolean and inset a new Boolean to the stream
tEnv.toRetractStream(table, Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING, Types.SQL_TIMESTAMP));
tEnv.toRetractStream(table, Row.class);
tEnv.toRetractStream(table, new TupleTypeInfo(Types.STRING, Types.INT);
Copy the code