Author: He Xiaoling (Xiao Ling)

This article was shared by Alibaba technology expert He Xiaoling and compiled by community volunteer Zheng Zhongni. Based on Flink 1.9 version, this article explains SQL related principles and some functional changes in Flink 1.9 version from the perspective of users, hoping to be helpful to everyone. The main content is shared in the following three parts:

  1. The design and use of TableEnvironment
  2. Catalog design and DDL practices
  3. Several important improvements and optimizations of Blink Planner

TableEnvironment

FLIP-32 proposed that Blink be completely open source and merged into the main branch of Flink. After the merger there will be two planners in Flink 1.9: Flink Planner and Blink Planner.

In previous versions, Flink Table was a second-class citizen throughout Flink. Flink SQL has the characteristics of ease of use, low threshold of use by users, more and more attention, Flink Table module has been promoted to the first-class citizen. At the beginning of the design, Blink considers the unification of stream and batch. Batch is only a special form of stream, so we can use the same TableEnvironment to express stream and batch.

TableEnvironment overall design

FIG. 1 Overall design of new Table Environment

From Figure 1, it can be seen that the TableEnvironment components are as follows:

  • Flink Table-common: This package contains the common code of Flink Planner and Blink Planner.
  • Flink-table-api-java: This section is the API for user programming and contains most of the API.
  • Flink-table-api-scala: This is a very thin layer, only related to Expression and DSL of the Table API.
  • Two planners: Flink-table-planner and flink-table-planner-blink.
  • Two Bridge: Flink-table-api-scala-bridge and Flink-table-api-Java-Bridge. As you can see from the figure, both Flink Planner and Blink Planner rely on specific Java apis. API operations can be converted to Scala DataStream and DataSet, or JAVA DataStream or DataSet by Bridge.

Compare the old and new TableEnvironments

Before Flink 1.9, the original Flink Table module had 7 environments, which were relatively difficult to use and maintain. 7 environments include StreamTableEnvironment and BatchTableEnvironment, 2 JAVA and Scala respectively, 4 in total, plus 3 parent classes, a total of 7.

Under the new framework, the community wanted to unify streams and batches, so the original design was streamlined. First, provide a unified TableEnvironment in the flink-table-apI-java package. Then, in the Bridge, two StreamTableEnvironments are provided for bridging Scala DataStream and Java DataStream. Finally, because Flink Planner still has operations like toDataSet(), the BatchTableEnvironment is retained for the time being. Thus, there are currently five TableEnvironments.

Because Flink Planner will be removed in the future, BatchTableEnvironment will be abandoned, and the design of the whole TableEnvironment will be more concise and clear.

Application of the new TableEnvironment

This section describes new application scenarios and related restrictions. The new TableEnvironment scenario is detailed below:

Figure 2 Adaptation scenario of the new Table Environment

The first line, for simplicity, calls the new TableEnvironment UnifyTableEnvironment later. In Blink, Batch is considered a special case of Stream, so Blink Batch can use an UnifyTableEnvironment.

UnifyTableEnvironment has some limitations in 1.9, such as it can not register UDAF and UDTF, the Type derivation of the new Type System has not been completed (Java, Scala Type derivation is not unified), So this part of the function is not supported for the time being. In addition, UnifyTableEnvironment cannot interconvert with DataStream and DataSet.

In line 2, the Stream TableEnvironment supports conversion to DataStream, as well as registering UDAF and UDTF. If written in JAVA, register with JAVA’s StreamTableEnvironment; if written in Scala, register with Scala’s StreamTableEnvironment.

Note that the Blink Batch job does not support aStream TableEnvironment, because currently the Batch cannot interconvert with DataStream, so semantics such as toDataStream() are not currently supported. It can also be seen from the figure that Blink Batch can only use TableEnvironment at present.

In the last line, BatchTableEvironment can be converted to a DataSet using toDataSet().

From Figure 2 above, you can clearly see what each TableEnvironment can do and what limitations they have.

Next, use examples to illustrate the various situations.

Example 1: Blink Batch

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); TEnv... TEnv. Execute (" job name ");Copy the code

As shown in Figure 2, Blink Batch can only use TableEnvironment (i.e. UnifyTableEnvironment). In the code, an EnvironmentSetting needs to be created first. Both specify Blink Planner and Batch mode. The need to specify Blink Planner is due to the fact that Flink 1.9 currently places both the Flink Planner and Blink Planner jars in the Flink lib directory. If you do not specify the Planner to use, the entire framework does not know which Planner is required, so it must be specified explicitly. Of course, if there is only one Planner JAR underneath the lib, there is no need to show which Planner to use.

Also, it is important to note in UnifyEnvironment, the user is unable to get to the ExecutionEnvironment, which users can’t finish the homework in the process, using ExecutionEnvironment. The execute () method to start the task. You need to explicitly start the task using the TableEnvironment.execute () method, which is quite different from the previous job start.

Example 2: Blink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment execEnv =... StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings); TEnv...Copy the code

Blink Stream can use either UnifyTableEnvironment or StreamTableEnvironment, basically similar to Batch mode. Just change inBatchMode to inStreamingMode.

Example 3: Flink Batch

ExecutionEnvironment execEnv = ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
tEnv...
Copy the code

With no change before, do not make too much introduction.

Example 4: Flink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); TEnv... TEnv. Execute (" job name ");Copy the code

Flink Stream also supports both UnifyEnvironment and StreamTableEnvironment, except that when specifying Planner, you need to specify useOldPlanner, also known as Flink Planner. As Flink Planner was to be removed, it was purposely named OlderPlanner, and only inStreamingMode was available, not inBatchMode.

The Catalog and DDL

Building a new Catalog API was mainly proposed by flip-30. The previous ExternalCatalog will be scrapped. ExternalCatalog is no longer supported in Blink Planner. Flink Planner also supports ExternalCatalog.

The new design of the Catalog

Here is the overall design of the new Catalog:

As you can see, the new Catalog has a three-tier structure (..) , the top layer is the name of Catalog, the middle layer is Database, the bottom layer is all kinds of MetaObject, such as Table, Partition, Function and so on. Currently, two Catalog implementations are built in: MemoryCatalog and HiveCatalog. Of course, users can also implement their own Catalog.

What can Catalog do? First, it supports Create, Drop, List, Alter, Exists statements. It also supports operations on Database, Table, Partition, Function, Statistics, etc. Basically, the common SQL syntax is already supported.

CatalogManager, as its name suggests, is primarily used to manage catalogs and can manage multiple catalogs at the same time. That is, queries or associated operations can be performed across the Catalog in the same SQL. For example, the correlation between A Hive Catalog and B Hive Catalog is supported, which brings great flexibility to Flink queries.

The operations supported by CatalogManager include:

  • registerCatalog
  • Get all catalogs (getCatalogs)
  • Get a specific Catalog (getCatalog)
  • Get the current Catalog (getCurrentCatalog)
  • Set the current Catalog (setCurrentCatalog)
  • Get the current Database(getCurrentDatabase)
  • Set the current Database(setCurrentDatabase)

Although Catalog has a three-tier structure, it does not need to specify the value of the three-tier structure completely. You can only write Table Name. In this case, the system will use getCurrentCatalog and getCurrentDatabase to get the default value and automatically complete the three-tier structure. This design simplifies the use of the Catalog. If you want to switch the default Catalog, you simply call setCurrentCatalog.

In the TableEnvironment layer, methods are provided to manipulate the Catalog, for example:

  • registerCatalog
  • List all your catalogs
  • Get the specified Catalog (getCatalog)
  • Using a Catalog (useCatalog)

In the SQL Client layer, there is also some support, but the functionality is limited. Users cannot directly Create the Catalog using the Create statement. Instead, they can define Description in the YARN file to describe the Catalog. Then, when starting the SQL Client, Define Catalog by passing -e +file_path. Currently, SQL Client supports operations such as listing a defined Catalog, using an existing Catalog, and so on.

DDL design and use

With the Catalog, you can use DDL to manipulate the contents of the Catalog. You can use the sqlUpdate() method of the TableEnvironment to execute DDL statements, or you can execute DDL statements in the SQL Client.

The sqlUpdate() method supports the Create Table, Create View, Drop Table, and Drop View commands. Of course, statements like inset into are also supported.

The four commands are described as follows:

  • Create Table: displays the specified Catalog Name or DB Name. If default, the Current Catalog is completed according to the specified field Name and field description, and the Partition By syntax is supported. Finally, there is a With parameter that allows you to specify the Connector to use, for example, Kafka, CSV, HBase, etc. The With parameter requires a number of attribute values, which can be found in the Factory definition of each Connector. Factory indicates which required and optional attribute values are available.

It is important to note that computed columns and Watermark definitions are not currently supported in the DDL, and will be improved in future releases.

Create Table [[catalog_name.]db_name.]table_name(
  a int comment 'column comment',
  b bigint,
  c varchar
)comment 'table comment'
[partitioned by(b)]
With(
    update-mode='append',
    connector.type='kafka',...).Copy the code
  • Create View: You need to specify the name of the View, followed by SQL. The View will be stored in the Catalog.
CREATE VIEW view_name AS SELECT xxx
Copy the code
  • Drop Table&Drop View: Similar to the standard SQL syntax, the IF EXISTS syntax is supported. IF no IF EXISTS is added to Drop a table that does not exist, an exception is thrown.
DROP TABLE [IF EXISTS] [[catalog_name.]db_name.]table_name
Copy the code
  • DDL execution in SQL Client: Most operations support only View operations, only Create View and Drop View can be used. Catalog, Database, Table, and Function can only be viewed. Users can Use an existing Catalog in the SQL Client, modify properties, or perform operations such as Description and Explain.
CREATE VIEW DROP VIEW SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS l USE CATALOG xxx SET xxx=yyy DESCRIBE table_name EXPLAIN  SELECT xxxCopy the code

The DDL part of Flink 1.9 is already in place, but there are still some features that need to be improved in the future.

Blink Planner

This section will start with the process of how the SQL/Table API is transformed into a real Job Graph to give you a clear understanding of the Blink Planner. I hope you can read the Blink code or use Blink. Then it introduces the improvement and optimization of Blink Planner.

Figure 4 Main flow

As can be clearly seen from the figure above, the parsing process involves three layers: Table API/SQL, Blink Planner, and Runtime. The main steps are explained below.

  • Table API&SQL parsing validation: In Flink 1.9, the Table API has undergone extensive refactoring to introduce a new set of operations, which are mainly used to describe the Logic Tree of tasks.

When the SQL is sent in, the SQL parse is performed first. After the SQL parse is completed, the SqlNode Tree is obtained, and then the Validate is performed. FunctionManger and CatalogManger are accessed during authentication. FunctionManger is used to query user defined UDFs and check whether the UDFs are valid. CatalogManger checks whether the Table or Database exists and generates an Operation DAG.

As you can see from this step, both the Table API and SQL will eventually be converted into a unified structure in Flink, namely Operation DAG.

  • Generate RelNode: Operation DAG is converted to RelNode(relational expression) DAG.

Optimizer: The optimizer makes various optimizations for RelNode. The input to the optimizer is the rules for various optimizations, as well as various statistics. Currently, in Blink Planner, most optimization rules, Stream and Batch are shared. The difference is that, for Batch, it has no concept of state, while for Stream, it does not support sort, so currently Blink Planner runs two separate Rule sets. Then two separate sets of Physical Rel are defined: BatchPhysical Rel and StreamPhysical Rel. The result of optimizer optimization is the Physical Rel DAG.

  • Transformation: After Physical Rel Dag is obtained, ExecNode will be converted to ExecNode. According to the name, ExecNode already belongs to the concept of executive layer, but this executive layer is Blink executive layer. In ExecNode, a large number of CodeGen operations will be performed. There are also non-code Operator operations, and finally, convert ExecNode to Transformation DAG.

  • Generating an executable Job Graph: Transformation DAG is transformed into a Job Graph to parse SQL or Table apis.

Blink Planner improvements and optimizations

The functional improvement of Blink Planner mainly includes the following aspects:

  • More complete SQL syntax support: for example, IN, EXISTS, NOT EXISTS, subqueries, complete Over statements, Group Sets, etc. And has run through all the TPCH, TPCDS these two test sets, the performance is very good.
  • Provides richer, more efficient operators.
  • It also provides a very perfect cost model and can connect with the statistics information in Catalog, so that cost can get a better execution plan according to the statistics information.
  • Join reorder is supported.
  • Shuffle service: For Batch, Blink Planner also supports shuffle Service, which is of great help to the stability of Batch assignments. If Batch assignments fail, shuffle service can be used to quickly recover.

Performance mainly includes the following parts:

  • Segmented optimization.
  • Sub – Plan Reuse.
  • Richer optimization rules: More than a hundred rules in total, and the vast majority of rules are Stream and Batch shared.
  • More efficient data structure BinaryRow: Saves serialization and deserialization operations.
  • Mini-batch Support (Stream only) : Saves state access operations.
  • Save redundant Shuffle and Sort (Batch mode) : If the data between two operators has been shuffled by A, and then the downstream of the operator also needs to be shuffled by A, the Shuffle layer in the middle can be omitted, thus saving A lot of network overhead. The situation of Sort is similar. Sort and Shuffle can improve overall performance if they are a large part of the calculation.

In-depth performance optimization and practice

In this section, we’ll use concrete examples to give you an in-depth understanding of the Blink Planner performance optimization design.

■ Segmented optimization

Example 5

create view MyView as select word, count(1) as freq from SourceTable group by word; insert into SinkTable1 select * from MyView where freq >10;
insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;
Copy the code

SQL > convert RelNode DAG to RelNode DAG

Figure 5 Example 5 RelNode DAG

If you were using Flink Planner, after optimizing the layer, you would generate a DAG for the execution layer as follows:

Figure 6 Example 5 Old Planner DAG

As can be seen, the Old Planner simply started from Sink and traversed to Source in reverse, thus forming two independent execution links. It can be clearly seen from the figure above that Scan and first-layer Aggregate have repeated calculations.

In Blink Planner, after optimizing the layer, the DAG for the execution layer is generated as follows:

Figure 7 Example 5 Blink Planner DAG

Instead of optimizing every insert into call, Blink Planner caches all insert into operations and optimizes them just before execution so that you can see the complete execution diagram and see which parts are double-counted. Blink Planner finds these portions of the double calculation by looking for the largest common subgraph that can be optimized. After optimization, Blink Planner treats the section of the largest common subgraph as a temporary table for direct use by other sections.

Thus, the above graph can be divided into three parts, the largest common subgraph part (temporary table), temporary table optimized with Filter and SinkTable1, temporary table optimized with second Aggregate and SinkTable 2.

Blink Planner actually uses the declared View to find the largest common subgraph, so if a piece of logic needs to be reused in the development process, it is defined as a View, which can make full use of the segmentation optimization function of Blink Planner to reduce double calculation.

Of course, the current optimization is not the most perfect, because the graph was cut in advance, which may cause some optimization loss, and this part of the algorithm will continue to improve in the future.

To sum up, the segmentation optimization of Blink Planner actually solves multi-sink optimization problem (DAG optimization). Single Sink is not the concern of segmentation optimization. Single Sink can be optimized on all nodes without segmentation.

S Sub – Plan Reuse

Example 6

insert into SinkTabl
select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'
union all
select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;
Copy the code

The SQL in this example is similar to the segmented SQL, except that instead of sinking the results into two tables, the results are Union and Sink into one result Table.

Here is a DAG converted to RelNode:

Figure 8 Example 6 RelNode DAG

As can be seen from the figure above, Scan and Aggregate of the first level are also duplicated. Blink Planner actually finds them out and turns them into the following figure:

Figure 9 Example 6 Blink Planner DAG

Sub-plan optimization is enabled with two related configurations:

  • Table.optimizer.rebuild-sub-plan-enabled (default enabled)
  • Table.optimizer.reusability -source-enabled (default enabled)

The two configurations are enabled by default and can be disabled as required. Table. Optimizer. reusability -source-enabled In Batch mode, join may cause deadlocks. When hash-join or nested-loop-join is performed, build is read first and probe is read first. If reusability source-enabled is enabled, If the data Source is the same Source, the Source data is sent to both build and Probe. At this point, build data will not be consumed, causing the join operation to fail and the join will be stuck.

In order to solve the deadlock problem, Blink Planner first unloads probe data from disk, so that the data reading operation on build can be normal. After reading all the data on build, the data on Probe can be pulled from disk to solve the deadlock problem. However, there will be additional overhead, there will be one more write operation; Sometimes, when the overhead of reading a Source twice may be faster than that of writing a Source once, it is better to turn off the reuse Source for better performance.

Of course, if the overhead of reading the Source twice is much greater than the overhead of dropping the disk once, you can keep the repurse-source open. Stream mode does not have deadlocks because Stream mode joins do not have edge selection problems.

In summary, the sub-plan reuse problem is a subgraph reuse problem. It is similar to subsection optimization, but it is a complementary process.

** NOTE: **Hash Join: for two tables to Join T1 and T2. Select one of the tables to create a hash table based on the columns given by the join condition. Then scan another table to determine whether there are corresponding and equal rows in the hash table line by line to complete the join operation, which is called probe. The former table is called the Build table and the latter is called the Probe table.

■ Agg classification optimization

The Aggregate operations in Blink are very rich:

  • Group AGG, for example, select count(a) from t group by B
  • Over AGG, for example, select count(a) over (partition by B order by C) from t
  • Window AGG, for example, select count(a) from t group by tumble(ts, interval ’10’ second), b
  • Table agG, e.g. Tenv.scan (‘t’).groupby (‘a’).flatAggFunc (‘b’ as (‘c’, ‘d’))

The following is mainly to explain Group Agg optimization, mainly two types of optimization.

■ Local/Global Agg optimization

Local/Global Agg is used to reduce network Shuffle. To use Local/Global optimizations, the following conditions are required:

  • All Agg functions of Aggregate are mergeable. Each Aggregate needs to realize the merge method, such as SUM, COUNT and AVG, which can be completed in multiple stages and finally merge the results. But you can’t do the median, 95% and things like that, so you can’t do Local/Global optimization.
  • Table.optimizer. agg-phase-strategy Set this parameter to AUTO or TWO_PHASE.
  • In Stream mode, mini-batch is enabled. In Batch mode, AUTO selects whether to perform Local/Global optimization based on the cost model and statistics.

Example 7

select count(*) from t group by color
Copy the code

Without optimization, the Aggregate below produces 10 Shuffle operations.

Figure 10 Example 7 Unoptimized Count operation

After Local/Global optimization is used, the following operations are performed: Local aggregation is performed first, and then Shuffle is performed. Six Shuffle data items are left. In Stream mode, Blink actually pre-aggregates the results in a mini-batch dimension and then sends the results to the Global Agg for summary.

Figure 11 Example 7 Local/ global-optimized Count operation

■ Distinct Agg optimization

The Distinct Agg is optimized by rewriting SQL statements. But the Batch and Stream patterns solve different problems:

  • In Batch mode, you need to create Distinct AGGs first and then Agg. The logical implementation requires two steps. The overhead of implementing Distinct Agg directly is too high.
  • In Stream mode, the main problem is hotspot, because Stream needs to put all input data in State. If data has hotspot, State operation will be very frequent, which will affect performance.

Batch mode

At the first layer, the values of distinct and non-DISTINCT AGG functions are calculated. At the second layer, the values of distinct AGG functions are calculated.

Example 8

select color, count(distinct id), count(*) from t group by color
Copy the code

Manually rewrite to:

select color, count(id), min(cnt) from (
   select color, id, count(*) filter (where $e=2) as cnt from (       
      select color, id, 1 as $e from t --for distinct id 
      union all
      select color, null as id, 2 as $e from t -- for count(*) 
  ) group by color, id, $e 
) group by color
Copy the code

The logical process of transformation is shown in the figure below:

Figure 12 Example 8 Batch mode Distinct rewrite logic

Stream mode

There are some requirements for enabling Stream mode:

  • Must be agg support function: avg/count/min/Max/sum/first_value/concat_agg/single_value;
  • Table.Optimizer.distinct -agg.split.enabled (default off)

Example 9

select color, count(distinct id), count(*) from t group by color
Copy the code

Manually rewrite to:

select color, sum(dcnt), sum(cnt) from (
  select color, count(distinct id) as dcnt, count(*) as cnt from t 
  group by color, mod(hash_code(id), 1024)
) group by color
Copy the code

Before rewriting, the logic diagram would look like this:

Figure 13 Example 9 Stream mode not optimized for Distinct

After rewriting, the logical diagram looks like this, with hot data scattered across multiple intermediate nodes.

Figure 14 Example 9 Stream mode optimization Distinct

Note that 1024 in the mod(hash_code(id),1024) in the SQL of example 5 is the dimension of smashing. It is recommended to set this value to a larger value. Setting this value to a smaller value may not have good effect.

conclusion

This paper first introduces the overall design of the new TableEnvironment, and lists the choices of TableEnvironment in various modes. Then, through specific examples, it shows how to write code in various modes, as well as matters needing attention.

In the new Catalog and DDL part, the overall design of Catalog and the use of DDL are also explained by examples. Finally, the process of Blink Planner parsing SQL/Table API, the improvement of Blink Planner and the principle of optimization are explained, hoping to be helpful for everyone to explore and use Flink SQL.