Author wu | her evil (cloud), alibaba technical experts finishing | Chen Jing min (with), alibaba technical experts
Abstract: This article was compiled by Apache Flink PMC Wu chong (Yunxie) and Chen Jingmin (Qingyue). To help you better understand how the Flink SQL engine works. The article is mainly divided into the following four parts:
- Flink SQL Architecture
- How Flink SQL Works?
- Flink SQL Optimizations
- Summary and Futures
Tips: Click the link below to view the original video shared by the author ~ ververica.cn/developers/…
The Apache Flink community has made a number of architectural improvements for future-oriented unified stream batch processing in the last two releases (1.9 & 1.10). One of the major changes was the introduction of Blink Planner, Support for the SQL & Table API to compile with a different SQL Planner (the pluginization of the Planner) began.
This article first introduces the thinking behind these optimizations, showing how a unified architecture can better handle streaming and batch queries, and then takes an in-depth look at the compilation and optimization process for Flink SQL, including:
- Flink SQL uses Apache Calcite to translate SQL into relational algebraic expressions using Expression Reduce, Optimization techniques such as Predicate/Projection Pushdown generate Physical execution plans and use Codegen techniques to generate efficient execution code.
- Flink SQL uses the efficient binary data storage structure BinaryRow to speed up computational performance; Mini-batch is used to improve throughput and reduce data jitter caused by Retraction in two-layer aggregation. Optimization principle of data skew processing and top-N sorting in aggregation scenario.
Flink SQL Architecture & Blink Planner (1.9+)
1.1 Limitations of the Old Planner
To understand why Flink SQL introduced the new architecture in 1.9, let’s first look at the architecture design prior to 1.9.
As can be seen from the figure, although user-oriented Table API and SQL are unified, streaming and batch tasks correspond to DataStreamAPI and DataSetAPI respectively at the translation layer. At the Runtime level, execution plans are also obtained according to different apis. The two-tier design limits the number of reusable modules and makes it difficult to expand.
1.2 Unified Blink Planner
Flink follows the concept of “batch is a special case of flow” at the beginning of design, and it is the general trend to unify flow batch in architecture. With the joint efforts of the community and Alibaba, version 1.9 introduced the new Blink Planner, which abstracted and reused common processing and optimization logic as much as possible by treating batch SQL processing as a special case of stream SQL processing. The unified processing of Stream and batch is realized through the Stream Transformation API inside Flink, replacing the original Flink Planner’s separate processing of Stream and batch.
In addition, the new architecture is compatible with the old version Planner in a flexible, plug-in way that users can choose. In version 1.11, however, Blink Planner will replace Old Planner as the default Planner to support further stream-batch convergence (Old Planner will be phased out later).
Flink SQL workflow
The workflow summary for the Flink SQL engine is shown in figure 1.
As can be seen from the figure, a program written using TableAPI to query SQL (hereinafter referred to as TableAPI code) goes through the following stages from input to compilation into executable JobGraph
- Convert SQL text/TableAPI code into Logical execution plans
- Logical plans are optimized to Physical execution plans by the optimizer
- A code generation technique can produce a quick doubling and then compile it into an executable JobGraph submission run
This section focuses on the common optimization methods and CodeGen generation of Flink SQL optimizer.
2.1 Logical Planning
Flink SQL engine uses Apache Calcite SQL Parser to parse SQL text into lexical tree, SQL Validator obtains metadata information from Catalog for syntax analysis and validation, and converts it into RelNode. The logical execution plan that the Optimizer transforms the relational algebraic expression into its initial state.
Note: The TableAPI code generates a logical execution plan after docking with the Catalog using the TableAPI Validator.
1 Consider the following SQL statement that expresses the JOIN operation.
SELECT
t1.id, 1 + 2 + t1.value AS v
FROM t1, t2
WHERE
t1.id = t2.id AND
t2.id < 1000
Copy the code
A logical execution plan in tree structure is obtained. The root node corresponds to the upper-layer Select statement, the leaf node corresponds to the TableScan operation of input tables T1 and T2, and the Join and Where condition filtering correspond to the Join and Filter nodes respectively.
LogicalProject(id=[$0], v=[+(+(1, 2), The $1)])
+- LogicalFilter(condition=[AND(=($0.$3), <($3, 1000))])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default, t1]])
+- LogicalTableScan(table=[[default_catalog, default, t2]])
Copy the code
As the visualization shows, this is the initial state of the optimizer when it starts working.
Let’s start with a few common ways to optimize the Flink SQL optimizer.
S 2.1.1 Expression Reduce
Expression is the most common syntax in SQL. For example, t1.id is an expression, and 1 + 2 + t1.value is also an expression. In the optimization process, the optimizer recursively traverses the nodes in the tree to calculate the value of each expression as predictably as possible. This process is called expression folding. This transformation is logically equivalent and is optimized so that it no longer needs to evaluate 1 + 2 for each record when it is actually executed.
S 2.1.2 PushDown Optimization
Pushdown optimization refers to pushing transformations in SQL statements as close to the data source as possible for better performance while preserving the semantics of relational algebra. Common Pushdown optimizations include Predicate Pushdown, Projection Pushdown, and Projection Pushdown. Sometimes translated as row cutting, etc.
- Predicate Pushdown
Reviewing E.g.1, we found that the filter condition t2.id < 1000 in the WHERE condition expression describes the constraint on table t2, has nothing to do with table t1, and can be completed before the JOIN operation. Suppose there are one million rows of data in table T2, but only 1,000 rows of data with ID < 1000, then the amount of data arriving at JOIN node after predicate push-down optimization is reduced by 1,000 times, greatly saving I/O overhead and improving JOIN performance.
Predicate Pushdown is a basic technique for optimizing SQL queries. Predicate is derived from mathematics and refers to a function or expression that derives a Boolean return value (TRUE/FALSE) from which data can be filtered. Predicate push-down refers to moving the Filter as close to the data source as possible (such as the SCAN phase of reading data) to reduce the amount of data (records) queried and passed without changing the semantics of the relational algebra.
- Projection Pushdown
Column clipping is a more intuitive way to describe Projection Pushdown. It refers to cutting unused columns during optimization to reduce I/O overhead and improve performance. But unlike predicate push-down, which only moves node positions, projection push-down may increase the number of nodes. If a TableScan node does not have a Projection node, the optimizer explicitly adds new Projection nodes to optimize. In addition, if the input table is column-based (such as Parquet or ORC), optimizations continue to be pushed down into the Scan operation.
Reviewing E.g.1, we found that only id and value fields of table t1 were used in the whole query, and id fields of table t2 were used. TableScan nodes were added respectively to remove redundant fields, which greatly saved I/O overhead.
To recap, predicate push-down and projection push-down reduce I/O overhead and improve performance by avoiding processing unnecessary records and fields, respectively.
2.2 Physical Planning on Batch
Through the above series of operations, we get the optimized logical execution plan. The logical execution plan describes the execution steps and the actions that need to be done at each step, but does not describe how the actions are implemented. The physical execution plan takes into account the characteristics of the physical implementation and generates a specific implementation for each operation. For example, Join uses SortMergeJoin, HashJoin or BroadcastHashJoin. The optimizer will calculate the Cost of each node in the whole tree when generating the logical execution plan. For nodes with multiple implementation modes (such as Join nodes), the optimizer will expand all possible Join modes to calculate the Cost separately. Finally, the implementation with the lowest Cost on the entire path is selected as the Final Physical Plan.
Recall that when E.g.1 was executed in batch mode, we could get the Statistics information for the input table. After the above optimization, table T2 has only 1,000 data when it reaches the Join node, and the cost of BroadcastJoin is relatively low, so the final Physical Plan is shown in the figure below.
2.3 Translation & Code Generation
Code Generation is a widely used technology in computing. Code Generation is used in the Physical Plan to generate the Transformation Tree.
Recall that in E.g.1, the expression t2.id < 1000 for the Calc node above table t2 is generated by Code Generation to generate Java Code describing Transformation Operator. Sends the received Row with id < 1000 to the next Operator.
The Flink SQL engine translates Physical plans into Transformations through Code Generation and compiles them into executable JobGraph.
2.4 Physical Planning on Stream
The above example assumes that the Flink SQL engine is compiled in batch mode. Let’s look at an important mechanism in the Physical Plan generation process when compiled in stream mode: Retraction Mechanism (aka.changelog Mechanism).
S against 2.4.1 Retraction Mechanism
Retraction is a mechanism for taking back Early Firing data from streaming data processing, similar to traditional database Update operations. The absence of Retraction in complex SQL, such as cascading aggregations, results in calculations that differ from batch calculations, a shortcoming of many of the industry’s current streaming computing engines.
E.g.2 consider the following SQL for statistical word frequency distribution.
SELECT cnt, COUNT(cnt) as freq
FROM (
SELECT word, COUNT(*) as cnt
FROM words
GROUP BY word)
GROUP BY cnt
Copy the code
Suppose the input data is:
After the above calculation, the expected output should be:
However, unlike batch processing, the data in stream processing arrives one by one, and each piece of data theoretically triggers a calculation. Therefore, after processing the first Hello and the first World, the number of words with 1 frequency has become 2. If the result of processing the second Hello cannot be corrected, Hello will be counted at the same time as word frequency equals 1 and word frequency equals 2, which is obviously incorrect because Retraction does not exist.
An important contribution of Flink SQL in the field of stream computing is to propose the concrete implementation scheme of this mechanism for the first time. The Retraction mechanism is also known as the Changelog mechanism because Flink, to some extent, sees the input stream data as the Changelog of the database, and each input data can be regarded as a change operation to the database, such as Insert, Delete or Update. Take MySQL database as an example, its Binlog information is stored in binary format, where Update_rows_log_event corresponds to two tags Before Image (BI) and After Image (AI). Indicates the information of a row before and after the update.
When the Flink SQL optimizer generates the Physical Plan of the flow job, it determines whether the current node is an update operation. If so, it sends two messages update_before and update_After to the downstream node. Update_before indicates the data that was sent “incorrectly” and needs to be deleted. Update_after indicates the data that is sent “correctly”. When received, the downstream subtracts update_before from the result and adds update_after.
In retrospect of E.g.2, the following gifs illustrate the calculation of the correct results with Retraction.
Update_before is a critical piece of information that flags the “culprit” that caused the current result to be incorrect. However, there is an overhead associated with the extra operation. In some cases, the correct result can be obtained without sending update_before, such as when the downstream node is connected to UpsertSink (MySQL or HBase). The database can overwrite the result with the update_after message by pressing the primary key. Whether to send update_before or not is up to the optimizer and not the user’s concern.
S Update_before 2.4.2 for Decision
Given the Retraction mechanism and update_before, how does the optimizer decide whether to send update_before? This section describes the work in this section.
Step1: determine the Changelog change type for each node
The three most common types of operations in a database are Insert (denoted by [I]), Delete (denoted by [D]), and Update (denoted by [U]). The optimizer first looks at each node from the bottom up, determines which type (s) it belongs to, and marks each node accordingly.
Review E.g.2, the first Source node is an Insert, denoted as [I], because it produces only new data; The second node computes the inner aggregation, so it sends an update message, denoted as [I, U]; The third node cuts the word field, which is simple calculation and passes the upstream change type, denoted as [I, U]. The fourth node is the outer aggregate calculation, because it receives the Update message from the upstream, it needs the Delete operation to ensure the success of the Update, denoted as [I, U, D].
Step2: determine the message type sent by each node
Before introducing Step2, let’s look at the representation of the Update message type in Flink. In Flink, Update is represented by update_before (UB) and update_after (UA), where UB messages may not be sent in some cases to improve performance.
In Step1, the optimizer deduces the Changelog change operation corresponding to each node from bottom to top. In this step, it first deduces the message type that the current node needs to be provided by the parent node from top to bottom until it encounters the first node that does not need any message type from the parent node. Work back to the final implementation of each node and the type of message required.
Recall that since the uppermost node is the UpsertSink node, only its parent node is required to provide [UA]. For the Aggregate node in the outer layer, because the input of the Aggregate node has the Update operation, the parent node needs to provide [UB, UA] so that the calculation status of the Aggregate node can be correctly updated.
Further down to the Calc node, it needs to pass the requirements of [UB, UA] to its parent, the inner Aggregate node. On the other hand, the inner Aggregation, whose parent is the Source, does not generate any Update, so it does not require the Source to send any additional [UB/UA]. When the optimizer traverses the Source node, it starts backtracking. If the current node meets the requirement of the child node, it updates the corresponding label to the node. Otherwise, the plan cannot be generated. Firstly, the Aggregate of the inner layer can generate UB, so it can meet the requirement of the child nodes. Therefore, the optimizer will label the Aggregate of the inner layer with [UB, UA], and then pass it up to the Calc node, and label it with [UB, UA]. As the downstream of the Aggregate node in the outer layer only needs to receive the updated message, it is labeled with [UA], indicating that it only needs to send update_after to the downstream.
These tags will eventually affect the physical implementation of the operator, such as the outer Aggregate node. Since it receives [UB] from upstream, the physical implementation will use Retract Count, and it will only send update_after to Sink. For the Aggregate node in the inner layer, Count without Retract can be implemented because the data sent from upstream does not have [UB]. In addition, with [UB] tag, update_before needs to be sent downstream.
Flink SQL Internal Optimization
We have described how the Flink SQL engine works, and will briefly review some of the optimizations within Flink SQL. More information can be found at Flink Forward Asia 2019.
3.1 BinaryRow
Before Flink 1.9+, the data structure passed between operators in Flink Runtime layer was Row, and its internal implementation was Object[]. The problem with this data structure is that it not only requires extra Object Metadata storage, but also involves a lot of serialization/de-sequence (especially deserialization of the entire Row if you only need to process a few fields), and the unpacking/boxing of primitive types. There’s a lot of additional performance overhead.
Flink 1.9 began with the introduction of Blink Planner, which uses BinaryRow, a binary data structure, to represent records. BinaryRow applies to Memory segments with a default size of 32K and maps directly to Memory. A BinaryRow is divided into headers, fixed-length, and variable-length sections. Header is used to store the Retraction message identifier. The fixed-length area uses eight bytes to record Nullable information for the field and all primitics and types that can be represented within eight bytes. The other types are stored in the variable-length area with offsets based on the starting position.
BinaryRow as the basic data structure of Blink Planner brings obvious benefits. First, BinaryRow is more compact in storage, eliminating extra overhead. The second significant performance improvement in serialization and deserialization is that only the required fields can be deserialized according to offset, and the serialization can be done directly by in-memory copy after the Object Reuse is enabled.
3.2 the Mini – batch Processing
Flink is a pure stream processing framework that theoretically triggers a calculation for every new piece of data that arrives. At the implementation level, however, this would result in read/write State and serialization/deserialization for every piece of data processed in an aggregation scenario. If you can buffer a certain amount of data in memory, do aggregation in advance and then update State, it will not only reduce the cost of State operation, but also effectively reduce the amount of data sent to the downstream and improve throughput. This is the core idea of mini-Batch optimization, which reduces the data jitter caused by Retraction during two-level aggregation.
3.3 Skew Processing
The optimization of data skew can be divided into two ways: whether or not to have DISTINCT de-duplication semantics. For data skew of common aggregation, Flink introduces local-global two-stage optimization, similar to the processing mode of Adding Local Combiner to MapReduce. For the aggregation with deduplication, Flink will Hash the user’s SQL according to the key combination of the original aggregation plus DISTINCT keys, and then rewrite the SQL into a two-layer aggregation to disperse the SQL.
3.4 the Top – N Rewrite
Global sorting is difficult to implement in streaming scenarios, but the problem becomes solvable if you only need to compute up to the current top-N extremum. However, the traditional SQL syntax of database sorting is to LIMIT the number of entries BY ORDER BY, and the mechanism behind the implementation is also to scan the whole table sorting and then return the LIMIT number of entries. In addition, if some fields are windowed, ORDER BY cannot meet the requirements. Flink SQL uses the syntax of top-N in batch scenarios and uses ROW_NUMBER syntax to do top-N sorting in stream scenarios.
3. The following SQL calculates the Top3 stores in each category
SELECT*
FROM(
SELECT *, -- you can get like shopId or other information from this
ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rowNum
FROM shop_sales )
WHERE rowNum <= 3
Copy the code
In terms of Plan generation, the ROW_NUMBER semantics correspond to the OverAggregate window node and a Calc node that filters the number of rows. However, this window node needs to sort the historical data in the State again for each arrived data at the implementation level, which is obviously not the optimal solution.
We know that the optimal operation for solving Max/min values in streaming scenarios is by maintaining a minHeap/maxHeap of size N. We need to add a rule to the optimizer that optimizes the logical node generated by ROW_NUMBER into a special Rank node corresponding to the above optimal implementation (of course, this is only one implementation of a special Rank). This is the idea at the heart of top-n Rewrite.
Summary & Futures
Review of this article
- Flink 1.9 + introduces a new architecture in SQL & TableAPI, and unify the technology stack, making a big step towards the direction of stream & batch integration.
- An in-depth look at the inner workings of the Flink SQL engine and how much Flink SQL does in terms of optimization while being transparent to the user.
Future Work Plan
- In versions after Flink 1.11+, Blink Planner will provide production-level support as the default Planner.
- Flip-95: Reconfiguration of a TableSource & TableSink interface. Supports flow and batch integration, supports Changelog message flows on the Source end, and supports the FLIP 105 CDC data Source.
- Flip-105: Flink TableAPI & SQL support for CDC.
- Flip-115: Extends the current CSV only FileSystem Connector, making it a streambatch unified Generalized FileSystem Connector.
- Flip-123: Compatible with Hive DDL and DML, allowing users to run Hive DDL in Flink.