1. The overall implementation process of Flink Table API

The main operation process is as follows:

// Create the execution environment for the table
val tableEnv = ...     

// Create a table for reading datatableEnv.connect(...) .createTemporaryTable("inputTable")

// Register a table to output the calculated resultstableEnv.connect(...) .createTemporaryTable("outputTable")

// Use the Table API query operator to get a result Table
val result = tableEnv.from("inputTable").select(...)

// Use SQL query to get a result table
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")

// Write the result table to the output table
result.insertInto("outputTable")

Copy the code

2. Create and configure the flow processing execution environment

  1. Create table environment

    Based on the stream execution environment, call create method directly create:

    val tableEnv = StreamTableEnvironment.create(env)
    Copy the code

    TableEnvironment is the core concept of integrated Table API & SQL in FLink. It is responsible for:

    • Registration of the catalog

    • Registry in the internal catalog

    • Execute SQL query

    • Register user – defined functions

    • Convert DataStream or DataSet to a table

    • Save the ExecutionEnvironment or StreamExecutionEnvironment references

  2. Environment configuration

    When creating a TableEnv, you can configure the TableEnvironment features with a few parameters.

    • Older version of streaming query configuration

      EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useOldPlanner()      // Use the old version of Planner
        .inStreamingMode()    // Stream processing mode
        .build()
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
      
      Copy the code
    • Batch configuration of older versions

      EnvironmentSettings settings = EnvironmentSettings.newInstance()
                      .useOldPlanner()      // Use the old version of Planner
                      .inBatchMode()    // Use the old stream processing mode
                      .build()
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)    
      Copy the code
    • Stream processing configuration for the BLINK version

      EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
      .inStreamingMode().build()
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
      Copy the code
    • Batch configuration for the BLINK version

      EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inBatchMode().build();
      TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
      Copy the code

3. Catalog operation and use

1) Catalog type:

  1. GenericInMemoryCatalog: The built-in Catalog. The name is default_catalog, and the default database is default_DATABASE. By default, tables registered with TableEnvironment#registerTable are registered in this Catalog.

  2. User-defined Catalog: user-defined Catalog. For example, HiveCatalog in Flink-connector-hive.

Note:

The metadata object names in the GenericInMemoryCatalog are case sensitive. HiveCatalog stores all metadata object names in lower case.

The default Catalog: default_catalog; The Database: default_database.

2) Catalog

  1. Gets the currently used Catalog

    tableEnv.getCurrentCatalog()
    Copy the code
  2. Get the Database currently in use

    tableEnv.getCurrentDatabase()
    Copy the code
  3. Register a custom Catalog

    tableEnv.registerCatalog("custom-catalog".new CustomCatalog("customCatalog"))
    Copy the code
  4. List all catalogs

    tableEnv.listCatalogs()
    Copy the code
  5. List all databases

    tableEnv.listDatabases()
    Copy the code
  6. Switch the Catalog

    tableEnv.useCatalog("catalogName")
    Copy the code
  7. To switch the Database

    tableEnv.useDatabase("databaseName")
    Copy the code

4. Realization of file system read operation (CSV)

  1. POM depends on

    <! Import CSV descriptor dependency package -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
    Copy the code
  2. Code implementation

    public static void main(String[] args) throws Exception {
        //1. Create a stream running environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2. The old Version of Planner uses streaming queries by default without specifying EnviromentSettings
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
    
        //3. Specify a path to read the CSV file
        String filePath = "./data/user.csv";
    
        //4. Read the CSV file and configure the attribute type
        tabEnv.connect(new FileSystem().path(filePath))// Reads file data in the specified file directory. The object must be an implementation class that implements a ConnectorDescriptor
        .withFormat(new Csv()) // Define a format method for reading data from external files by passing in an implementation class that inherits from the Abstract class FormatDescriptor
        .withSchema(new Schema()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
        )// Define the table structure
        .createTemporaryTable("inputTable");
    
        System.out.println(tabEnv.getCurrentCatalog());
        System.out.println(tabEnv.getCurrentDatabase());
        //5. Convert the table data into a Table object
        Table inputTable = tabEnv.from("inputTable");
    
        //6. Print test
        tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);
    
        env.execute();
    }
    Copy the code

5. Message queue read operation implementation (Kafka)

  1. POM depends on

    <! Kafka connector jar -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <! Flink JSON serialization jar package -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    Copy the code
  2. Code implementation

    public static void main(String[] args) throws Exception {
        // Create a streaming environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // There is no EnviromentSettings specified. Streaming queries from the old version of Planner are used by default
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
    
        // Access kafka's CONNECT consumption data
        tabEnv.connect(new Kafka() // Read data from kafka
                .version("universal") / / specify the current environment with kafka's version: "0.8", "0.9", "0.10", "0.11", and "universal"
                .topic("rate")  // Specify the topic name to consume
                .property("zookeeper.connect"."10.10.20.15:2181") // Specify the cluster address of ZooKeeper
                .property("bootstrap.servers"."10.10.20.15:9092") // Specify the cluster address to consume Kafka
        ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("timestamp", DataTypes.BIGINT())
                        .field("type", DataTypes.STRING())
                        .field("rate", DataTypes.INT())
                ).createTemporaryTable("RateInputTable");
    
        Table rateInputTable = tabEnv.from("RateInputTable");
    
        tabEnv.toAppendStream(rateInputTable, Rate.class).print();
    
        env.execute();
    }
    Copy the code
  3. Consumer tests

    Start kafka consumer:

    Bin /kafka-console-producer.sh --broker-list 10.10.20.15:9092 --topic rateCopy the code

    Sending data:

    In 1618388392479,'REF', 9, 1618388392480,'USD', 4, 1618388392580,'HKD'9,Copy the code

6. How do I perform conditional query

6.1 Implementation of the Table API

The Table API is a query API integrated in Scala and the Java language. Unlike SQL, Table API queries are not represented as strings, but are called step by step in the host language.

The Table API is based on the Table class that represents a “Table” and provides a full set of method apis for handling operations. These methods return a new Table object that represents the result of the transformation operation applied to the input Table. Some relational transformation operations can consist of multiple method calls, forming a chain call structure. For example, table. The select (…). The filter (…). Select (…) Selects the field specified in the table, filter(…) Represents a filter condition.

Code implementation:

.// Based on TableAPI for data query conversion operation, so require registration of temporary Table needs to be read out, assigned to a Table object instance can be operated
Table resultTable = inputTable.filter("id == 1").select("id,name");

// Use TableAPI to aggregate the Table object
Table aggResultTable = inputTable.groupBy("id").select("id,id.count as count");

// Print tests
tabEnv.toAppendStream(resultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print("resultTable>>>").setParallelism(1); tabEnv.toRetractStream(aggResultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("aggResultTable>>>").setParallelism(1); .Copy the code

Output result:

resultTable>>>> (1,zhangsan)
aggResultTable>>>> (true, (2, 1)) aggResultTable > > > > (false, (2, 1)) aggResultTable > > > > (true, (2, 2)) aggResultTable > > > > (true, (1, 1))Copy the code

True represents new data, false represents existing historical data, and then prints “true,(2,2)” again for cumulative statistics.

6.2 Implementation of SQL

Flink’s SQL integration is based on Apache Cite, which implements the SQL standard. In Flink, SQL query statements are defined using regular strings. The result of the SQL query is a new Table.

// Use SQL to manipulate table data
Table resultTableBySQL = tabEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id");
tabEnv.toRetractStream(resultTableBySQL, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("sql result>>>").setParallelism(1);Copy the code

7. Realize data output operation

Data is written to TableSink. TableSink is a common interface that supports different file formats, storage databases, and message queues.

The most direct way to output a Table is to use table.executeInsert () to write a Table to a registered TableSink.

7.1 Output to files

Code implementation:

//1. Create a stream running environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. The old Version of Planner uses streaming queries by default without specifying EnviromentSettings
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

env.setParallelism(1);

//3. Specify a path to read the CSV file
String filePath = "./data/user.csv";

//4. Read the CSV file and configure the attribute type
tabEnv.connect(new FileSystem().path(filePath))// Reads file data in the specified file directory. The object must be an implementation class that implements a ConnectorDescriptor
 .withFormat(new Csv()) // Define a format method for reading data from external files by passing in an implementation class that inherits from the Abstract class FormatDescriptor
 .withSchema(new Schema()
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())
            )// Define the table structure
 .createTemporaryTable("inputTable");

//5. Convert the table data into a Table object
Table inputTable = tabEnv.from("inputTable");

Table resultTable = inputTable.select("id,name");

// Define the result table to write file data to the result file
tabEnv.connect(new FileSystem().path("./data/user.log"))
 .withFormat(new Csv())
 .withSchema(new Schema() // This method must be specified
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())
            )
 .createTemporaryTable("outputTable");

resultTable.executeInsert("outputTable");

//6. Print test
tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);

env.execute();
Copy the code

7.2 Output to KAFKA

Kafka message queue: Kafka message queue: Kafka message queue: Kafka message queue

String kafkaNode = "10.10.20.15:2181";
String kafkaNodeServer = "10.10.20.15:9092";
// Create a streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// There is no EnviromentSettings specified. Streaming queries from the old version of Planner are used by default
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

// Access kafka's CONNECT consumption data
tabEnv.connect(new Kafka() // Read data from kafka
        .version("universal") / / specify the current environment with kafka's version: "0.8", "0.9", "0.10", "0.11", and "universal"
        .startFromEarliest()
        .topic("rate")  // Specify the topic name to consume
        .property("zookeeper.connect", kafkaNode) // Specify the cluster address of ZooKeeper
        .property("bootstrap.servers", kafkaNodeServer) // Specify the cluster address to consume Kafka
).withFormat(new Csv())
        .withSchema(new Schema()
                .field("timestamp", DataTypes.BIGINT())
                .field("type", DataTypes.STRING())
                .field("rate", DataTypes.INT())
        ).createTemporaryTable("RateInputTable");

Table rateInputTable = tabEnv.from("RateInputTable");

// Access kafka's CONNECT consumption data
tabEnv.connect(new Kafka() // Read data from kafka
        .version("universal") / / specify the current environment with kafka's version: "0.8", "0.9", "0.10", "0.11", and "universal"
        .topic("output_rate")  // Specify the topic name to consume
        .property("zookeeper.connect", kafkaNode) // Specify the cluster address of ZooKeeper
        .property("bootstrap.servers", kafkaNodeServer) // Specify the cluster address to consume Kafka
).withFormat(new Csv())
        .withSchema(new Schema()
                .field("timestamp", DataTypes.BIGINT())
                .field("type", DataTypes.STRING())
                .field("rate", DataTypes.INT())
        ).createTemporaryTable("RateOutputTable");

// Write table data to the Kafka message queue
rateInputTable.executeInsert("RateOutputTable");

// Prints data information
tabEnv.toAppendStream(rateInputTable, StreamOutputKafkaApplication.Rate.class).print();

env.execute();
Copy the code

This article was created and shared by Mirson. For further communication, please add to QQ group 19310171 or visit www.softart.cn