1. What is the Table API & SQL

Table API& SQL is a relational API, users can operate data like MySQL database tables, without the need to write Java code to complete flink function, let alone manual optimization of Java code tuning. SQL is cheap to learn for a non-programmer to operate, and if a system provides SQL support, it will be easily accepted by users.

To summarize, the benefits of relational apis:

  1. Relational apis are declarative

  2. Queries can be effectively optimized

  3. Queries can be executed efficiently

  4. “Everybody knows” SQL

Flink itself is a unified processing framework for batch streams, so the Table API and SQL are the unified upper processing API for batch streams.

Table API& SQL is a unified API layer for stream processing and batch processing, as shown below:

  • Flink is uniform at the Runtime level because Flink executes batch tasks as a special case of a stream

  • At the API layer, Flink provides two sets of apis (DataSet and DataStream) for batch and stream

  • The Table API is a set of query apis embedded in the Java and Scala languages that allow us to combine queries from relational operators (such as SELECT, Filter, and Join) in a very intuitive way.

  • For Flink SQL, you can write SQL directly in code to implement some Query operations. Flink SQL support is based on Apache Calcite (Apache open source SQL parsing tool), which implements the SQL standard.

In both sets of apis, the specified queries have the same semantics and yield the same results regardless of whether the input is batch or streaming.

2. Batch case implementation

  1. Implementation notes

    In batch mode, load custom data, register as a table, and then count the number of occurrences of each name, and print it out.

    WordCount jack 2 WordCount mike 1

  2. Implementation steps

    1. Get the batch running environment

    2. Obtain the Table operating environment

    3. Load custom data source information

    4. Build external data into tables

    5. Query data in table mode

    6. Execute the task and print the result

  3. Code implementation

    TableApi implementation:

    /** * Table Api implementation */
    public static void tableApi(a) throws Exception{
        //1. Initialize the operating environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
    
        //2. Compile data source information
        DataSet<WordCount> input = env.fromElements(
                new WordCount("mike".1),
                new WordCount("jack".1),
                new WordCount("jack".1));
    
        //3. Convert dataSet to Table object
        Table table = tEnv.fromDataSet(input);
    
        //4. Group word, then query the specified field
        Table filterTable = table
                .groupBy("word")
                .select("word, frequency.sum as frequency");
    
        //5. Convert DataSet to Table object
        DataSet<WordCount> result = tEnv.toDataSet(filterTable, WordCount.class);
    
        //6. Print the output
        result.print();
        
    }
    Copy the code

    SQL implementation:

    /** * Table Api implementation */
    public static void flinkSQL(a) throws Exception{
        //1. Initialize the operating environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
        //2. Compile data source information
        DataSet<WordCount> input = env.fromElements(
                new WordCount("mike".1),
                new WordCount("jack".1),
                new WordCount("jack".1));
    
        //3. Convert dataSet to Table object
        Table table = tEnv.fromDataSet(input);
        //4. Create temporary views
        tEnv.createTemporaryView("WordCount", input, "word, frequency");
        //5. Execute the SQL query
        Table tableSQL = tEnv.sqlQuery(
                "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
        //6. Convert DataSet to Table object
        DataSet<WordCount> result = tEnv.toDataSet(tableSQL, WordCount.class);
        //7. Print the output
        result.print();
    }
    Copy the code

3. Flow processing case implementation

  1. Implementation notes

    In the Flink streaming environment, multiple sets of data are loaded, converted into tables, and the tables are converted into DataStream. SQL is used to merge data.

  2. Implementation steps

    1. Get the flow processing environment

    2. Set parallelism

    3. Obtain the Table operating environment

    4. Load collection data

    5. Convert DataStream to Table

    6. Register DataStream as a Table

    7. Use union all to associate the two tables

    8. Execute the task and print out the output

  3. Code implementation

    StreamSqlApplication class:

    //1. Initialize the streaming environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    
    // create the first order flow
    DataStream<Order> orderA = env.fromCollection(Arrays.asList(
            new Order(1L."beer".3),
            new Order(1L."diaper".4),
            new Order(3L."rubber".2)));
    
    //3. Create a second order flow
    DataStream<Order> orderB = env.fromCollection(Arrays.asList(
            new Order(2L."pen".3),
            new Order(2L."rubber".3),
            new Order(4L."beer".1)));
    
    //4. Convert DataStream to Table
    Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
    
    //5. Register DataStream as a Table.
    tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
    
    //6. Use union all to associate two tables
    Table result = tEnv.sqlQuery("SELECT * FROM " + tableA +
            " UNION ALL " +
            " SELECT * FROM OrderB ");
    
    //7. Output the data stream
    tEnv.toAppendStream(result, Order.class).print().setParallelism(1);
    
    //8. Execute tasks
    env.execute();
    Copy the code

4. Flink 1.9 Table architecture

In the new architecture, there are two query handlers:

  1. Flink Query Processor, also known as the Old Planner

  2. Blink Query Processor, also known as Blink Planner

The query processor is the concrete implementation of the Planner, which transforms the Table API & SQL job into a Transformation DAG recognized by Flink Runtime through the parser, Optimizer, and CodeGen processes. Finally, Flink Runtime schedules and executes the jobs.

The query processor of Flink has different branches for streaming computing and batch processing. The underlying API of streaming computing is DataStream, while that of batch processing is DataSet API.

The Blink query processor implements the unified streaming batch interface and the underlying API is Transformation, which means that we have no relationship with Dataset at all.

5. The difference between Flink Planner and Blink Planner

Starting with Blink 1.9, two different planner implementations were offered: Blink Planner and the Old planner from prior to 1.9. The Planner is responsible for converting operators into Flink jobs that Flink can execute, optimized. The two Planners have different optimization rules and runtime classes.

  1. From the perspective of model, Flink Planner does not consider the unification of stream computation and batch processing. In the bottom layer, it will be converted to DataStream API and DataSet API respectively. On the other hand, Blink Planner treats batch data sets as bounded streaming data, and both stream computing jobs and batch jobs will eventually be converted to the Transformation API.

  2. From an architectural perspective, Blink Planner implements BatchPlanner and StreamPlanner, respectively, for batch processing and stream computing, sharing most of the code and optimization logic. The Old Planner implements two completely separate systems for batch and stream calculation code, and basically does not implement reuse of code and optimized logic.

In addition to the advantages of model and architecture, Blink Planner has accumulated many practical functions based on the massive business scenarios within Alibaba Group, focusing on three aspects:

  1. Blink Planner improves the code generation mechanism, optimizes some operators, and provides rich and practical new functions, such as dimension table Join, Top N, MiniBatch, streaming de-duplication, and data slant optimization of aggregation scenes.

  2. Blink Planner’s optimization strategy is an optimization algorithm based on a common subgraph, including two strategies: cost-based optimization (CBO) and rule-based optimization (CRO), which makes the optimization more comprehensive. Also, Blink Planner supports data source statistics from catalog, which is important for CBO optimization.

  3. Blink Planner offers more built-in functions and more standard SQL support

Overall, the Blink query processor is more architecturally advanced and more functional. Flink 1.9 still uses Flink Planner by default for stability reasons, and users can specify it explicitly in the job if they want to use Blink Planner.

6. Flink Planner

  1. Import dependence

    <! Table program scheduler and runtime. This was the only planner for Flink prior to version 1.9. Still recommended. -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    Copy the code
  2. Code snippet

    Streaming query:

    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    // Flink stream query
    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
    // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings)
    
    Copy the code

    Batch data query:

    / / import packages
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    // Flink batch query
    ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
    Copy the code

7. Blink Planner

  1. Import dependence

    <! -- Import this package when using blink execution plan -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    Copy the code
  2. Code snippet

    Streaming query:

    / / import packages
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    // Flink stream query
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
    // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
    
    Copy the code

    Batch data query:

    // Flink batch query
    EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
    TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
    Copy the code

    Note: If the job needs to run in a clustered environment, set the scope of the Blink Planner dependencies to Provided when packaging to indicate that these dependencies are provided by the clustered environment. This is because Flink already packaged Blink Planner related dependencies when it compiled the package and did not need to reintroduce them to avoid conflicts.


This article was created and shared by Mirson. For further communication, please add to QQ group 19310171 or visit www.softart.cn