Author: Cheng Hequn (Military Commander)

** Article overview: ** This article mainly contains three parts: The first part, mainly introduces what is the Table API, from the concept of analysis, so that we have a perceptual understanding; The second part introduces how to use the Table API from the code level. The third part introduces the recent dynamic of the Table API. The structure of the article is as follows:

  • What is the Table API
    • Flink API overview
    • Table API features
  • Table API programming
    • WordCount sample
    • Operating Table API
      • How do I get a Table
      • If you output a Table
      • Query a Table
  • Dynamic Table API

What is the Table API

To better understand the Table API, let’s take a look at what Flink provides for users to use.

1. The Flink API overview

As shown in the figure, Flink provides three layers of API according to the ease of use and the strength of expression ability. From top to bottom, the expression ability is gradually enhanced. For example, processFunction is the lowest API with the strongest expression ability, which can be used to operate complex functions such as state and timer. Datastream API further encapsulates processFunction and provides a number of standard semantic operators to use, such as the common Window operators (including Tumble, Slide, Session, etc.). So the top SQL and Table APIS are the most convenient to use and have many characteristics of their own, which are summarized as follows:

First, the Table API & SQL is a declarative API. The user only cares about what to do, not how to do it. For example, in the WordCount example, the user only cares about what dimension to aggregate and what type of aggregate to do, not the underlying implementation.

Second, high performance. Table API & SQL has an underlying optimizer that optimizes query. For example, if there are two count operations in the WordCount example, the optimizer will recognize and avoid duplicate computations by keeping only one count operation and printing the same value twice for better performance.

Third, unified flow batch. As you can see from the above example, the API does not have partitioning and batching. The same set of queries can be used for streaming, avoiding developing two sets of code for business development.

Fourth, the standard is stable. Table API & SQL follows the SQL standard and is not changeable. The advantage of having a stable API is that you don’t have to worry about API compatibility.

Fifth, easy to understand. Semantic clarity, what you see is what you get.

2. The Table API features

The previous section introduced some common features of the Table API and SQL. This section focuses on the features of the Table API itself. It can be summarized as the following two points:

First, the Table API makes multi-declared data processing easier to write.

How to understand? For example, we have a Table (TAB) and need to perform some filtering operations and output to the resultTable. The corresponding implementation is tab.where(” a < 10 “).inertinto (” resultTable1 “); In addition, we need to do some additional filtering and then output the result as well, namely tab.where(” a > 100 “).insertinto (” resultTable2 “). You’ll find that using the Table API is very simple and easy to write in just two lines of code.

Second, the Table API is Flink’s own set of apis, which makes it easier to extend standard SQL. Of course, when you extend SQL, you don’t do it haphazardly, you take into account the semantics, atomicity, and orthogonality of the API, and you add if and only if you need to.

Compared to SQL, we can think of the Table API as a superset of SQL. The Table API does what SQL does, but we can extend and improve SQL from a usability and functionality perspective.

2. Table API programming

Chapter 1 introduces concepts related to the Table API. In this chapter we will look at programming with the Table API. In this chapter, we will start with a WordCount example to give you a general idea of the Table API programming. Then we will introduce the operation of the Table API, for example, how to get a Table, how to output a Table. And how to perform query operation on Table.

1. The WordCount for example

This is a complete batch version of WordCount written in Java, as well as the Scala and Streaming versions, all uploaded to Github (github.com/hequn8128/T… ) You can download it and try to run it or modify it.

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaBatchWordCount {   // line:10

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .registerTableSource("fileSource");  // line:20

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count"); tEnv.toDataSet(result, Row.class).print(); }}Copy the code

Let’s look at this example of WordCount. First, lines 13 and 14 do some initialization to the environment, using the ExecutionEnvironment getExecutionEnvironment method. Then, use the Create of BatchTableEnvironment to obtain the Table environment. After obtaining the Table environment, you can register TableSource, TableSink, or perform other operations.

It is important to note that both ExecutionEnvironment and BatchTableEnvironment are Java versions. For Scala applications, you need a Scala version of the environment. This is also a beginner’s problem, as environent is many and confusing. In order to make people better distinguish these environments, the environment is summarized below.

Batch /stream, and Java/ Scala are used to classify environments, so you need to pay special attention to the use of these environments. The problem of the environment, the community has made some discussions, pictured above [link] (mail.google.com/mail/u/0/?t below… Cwiki.apache.org/confluence/…). And I’m not going to elaborate here.

Going back to the WordCount example, the second thing you need to do once you get the Environment is to register the corresponding TableSource.

tEnv.connect(new FileSystem().path(path))
    .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
    .withSchema(new Schema().field("word", Types.STRING))
    .registerTableSource("fileSource");
Copy the code

It’s also very convenient to use. First of all, because we’re going to read a file, we need to specify the path to read the file, and once we specify that, we need to describe the format of the file contents, such as it’s a CSV file and what the line separator is. Also, specify what Schema the file corresponds to, such as a single list of words and type String. Finally, we need to register TableSource with the Environment.

Table result = tEnv.scan("fileSource")
    .groupBy("word")
    .select("word, count(1) as count");

tEnv.toDataSet(result, Row.class).print();
Copy the code

By scanning TableSource, we can obtain a Table object and perform operations, such as GroupBy and count. Finally, the Table can be output as a DataSet.

This is a complete example of WordCount for the Table API. It involves obtaining a Table, operating a Table, and outputting a Table. The following sections explain how to get a Table, output a Table, and perform a Table operation.

2. How to obtain a Table

Obtain a Table in two steps. First, register a TableSource. Second, call the SCAN method of Table Environement to obtain the Table object. There are three ways to register a Table Source: to register a Table descriptor, to register a custom Source, or to register DataStream. The specific registration method is shown in the figure below:

3. How to output a Table

For output tables, we have three similar methods: Table Descriptor, custom Table sink, and output to a DataStream. As shown below:

4. How to operate a Table

4.1 Table Operations overview

Section 2 and 3 describe how to obtain and output a Table. This section mainly describes how to operate a Table. The Table has many operations on it, such as select, filter, and WHERE. Aggregate operations, such as groupBy and flatAggrgate; There are join operations, and so on. Let’s take a specific example to introduce the conversion process of each operation in Table.

As shown above, when we get a Table, calling groupBy returns a GroupedTable. GroupedTable only has the select method. Calling the select method on GroupedTable returns a Table. Once we get the Table, we can call the methods on the Table again. Other tables in the figure, such as OverwindowedTables, follow a similar process. It is worth noting that various types of tables are introduced to ensure the legitimacy and convenience of the API. For example, after groupBy, only the select operation is meaningful, which can be directly clicked in the editor.

As mentioned earlier, we can think of the Table API as a superset of SQL, so we can also classify the operations in the Table in this way, which can be roughly divided into three categories, as shown below:

The first category is SQL aligned operations, such as SELECT, filter, join, etc. The second category is some actions to improve the usability of the Table API. The third category is operations that enhance the Table API. The first type of operation is similar to SQL and easy to understand. Secondly, you can also refer to the official documents to understand the specific methods. Therefore, it is not introduced here. The following sections focus on the latter two types of operations, which are also unique to the Table API.

4.2 Operations related to ease of use

Before we get into ease of use, let’s look at a question. ** If we have a large table with 100 columns, we need to remove one column from the table. ** We need to select the remaining 99 columns! Obviously, there is a cost to users. To solve this problem, we introduced a dropColumns method on the Table. Using the dropColumns method, we can write only the columns that are dropped. Corresponding to this, addColumns, addOrReplaceColumns and renameColumns methods are also introduced, as shown in the figure below:

With that out of the way, let’s look at another problem: ** Suppose we have a table with 100 columns, and we need to pick columns 20 through 80. What do we do? ** To solve this problem, we introduce withColumns and withoutColumns methods. For the previous problem, we could simply write table.select(” withColumns(20 to 80) “).

4.3 Enhanced Function-related operations

This section describes the functions and usage of TableAggregateFunction. Before the introduction of TableAggregateFunction, Flink had three custom functions: ScalarFunction, TableFunction and AggregateFunction. We can classify these custom functions by the dimensions of input and output. As shown below, ScalarFunction is an input line and an output line; TableFunction is input one line, output multiple lines; AggregateFunction is multiple lines of input and one line of output. To make the semantics more complete, the Table API has added a TableAggregateFunction that can receive and output multiple rows. With the addition of TableAggregateFunction, the functionality of the Table API can be greatly expanded and used to some extent to implement custom operators. For example, we can implement TopN with TableAggregateFunction.

TableAggregateFunction is also very simple to use, and the method signature and usage are shown below:

To use it, we simply call table.flatAggregate() and pass in an instance of TableAggregateFunction. Users can inherit TableAggregateFunction to implement custom functions. An Accumulator is defined to access the Accumulator state. In addition, the custom TableAggregateFunction implements accumulate and emitValue methods. The accumulate method processes the input data, while the emitValue method outputs the result based on the status in Accumulator.

Table API dynamic

Finally, the Table API has been updated recently:

1.Flip-29

Mainly Table API function and ease of use enhancement. For example, the columns operations we just introduced, and the TableAggregateFunction.

Community corresponding jira is: issues.apache.org/jira/browse…

2.Python Table API

Looking to add Python support to the Table API. This should be good news for Python users.

Community corresponding jira is: issues.apache.org/jira/browse…

3.Interactive Programming

That is, the Table provides a cache operator that can cache the results of the Table and perform other operations on the results. Community corresponding jira is: issues.apache.org/jira/browse…

4.Iterative Processing

The Table supports an iterator operator that can be used to perform iterative calculations. Such as iterating 100 times, or specifying a convergence condition, is widely used in machine learning. Community corresponding jira is: issues.apache.org/jira/browse…


Apache Flink Community Recommendation ▼

Apache Flink and Flink Forward Asia 2019, the top event in the field of big data, are now collecting topics, limited early bird ticket discount ING. To learn more about Flink Forward Asia 2019, check out:

Developer.aliyun.com/special/ffa…

The first Apache Flink Geek Challenge is open, focusing on machine learning and performance optimization. 400,000 prize money is waiting for you. To join the challenge, please click:

Tianchi.aliyun.com/markets/tia…

Follow Flink’s official community wechat official account to learn more about Flink!