Flink Table API &SQL programming Guide (1) introduces some basic concepts and general apis of Flink Table API &SQL. In this paper, the basic concepts of Flink Table API &SQL flow processing will be explained in more depth. Flink Table API &SQL implements the unification of batch processing, which also means that whether the batch input is bounded or the stream input is unbounded, the query operation using Flink Table API &SQL has the same semantics. In addition, since SQL was originally designed for batch processing, using relational queries on unbounded streams is different from using relational queries on bounded streams, and this article focuses on dynamic tables.

Dynamic table

SQL and relational algebra were not originally designed for stream processing, so there are some differences between SQL and stream processing. Flink implements SQL operations on unbounded data streams.

Relational queries on data flows

Traditional relational algebra (SQL) and stream processing differ in data input, execution, and output of results as follows:

The difference between Relational algebra /SQL Stream processing
Data input A set of table, bounded progenitors Unbounded data flow
perform Batch processing, performing queries on the entire input data You cannot execute queries on all data and need to wait for the data stream to arrive
Results output After the query processing is complete, fixed-size results are output Previous results need to be continuously updated, never ending

Despite these differences, it does not mean that SQL and stream processing cannot merge. Some advanced relational databases provide materialized views. A materialized view is defined by a query statement. Compared to a regular view, materialized views cache the results of the query, so when accessing materialized views, you do not need to perform repeated SQL queries. A common challenge of caching is how to prevent serving stale results, which can become stale when the query base table that defines materialized views changes. Eager View Maintenance is a technique that updates materialized views whenever their query base table is updated.

The connection between Eager View Maintenance ** and streaming SQL queries becomes obvious if you consider the following:

  • Database tables are executed on streamsINSERT.UPDATEandDELETEThe result of a DML operation statement is often referred to as a Changelog stream.
  • A materialized view is defined by a query statement. To update the atomization view, the query processes the change log stream of the atomization view continuously.
  • Atomized views are the result of a streaming SQL query.

Dynamic tables and continuous queries

Dynamic tables, the core concept behind Flink TableAPI &SQL’s support for streaming, change over time compared to batch static tables. A dynamic table Query produces a Continuous Query that does not terminate and produces the results of the dynamic table.

The following diagram shows the relationship between streams, dynamic tables, and Continuous Query:

  • 1. The flow is transformed into a dynamic table
  • Continuous Query Creates a new dynamic table
  • 3. Dynamic tables are converted to streams

Dynamic tables are a logical concept and will not be atomized during query execution.

Define tables on the flow

Using SQL queries on a data stream requires converting the flow into a table. The records in the stream are parsed and inserted into the table (for a stream with only insert operations), as shown below:

Continuous queries

  • Group aggregation

Continuous queries on a dynamic table produce a new dynamic table result. In contrast to batch queries, continuous queries never end and update previous results based on input data. In the following example, the click event stream is shown and computed using grouped aggregation, as shown below:

The figure above shows data on a user’s click behavior, calculated using group aggregation. When the first data [Mary,./home] comes in, the calculation is performed immediately and the result is printed: [Mary, 1]. When [Bob,./ CART] comes in, it is also computed immediately and outputs the computed results: [Mary, 1],[Bob, 1]. When [Mary,./prod? Id =1] comes in, it immediately evaluates and outputs the result: [Mary, 2],[Bob, 1]. As you can see, the grouping aggregation is applied to all the data and updates the previous output.

  • Window aggregation

Here is a group aggregation example, and here is a window aggregation example. Count user clicks in a one-hour Tumble Window, as shown below:

As shown above: Ctime indicates the time when the event occurs. It can be seen that there are four rows of data in one hour [12:00:00,12:59:59], three rows of data in one hour [13:00:00,13:59:59], and four rows of data in one hour [14:00:00,14:59:59].

As can be seen: In [12:00:00,12:59:59], the results of [Marry,13:00:00,3] and [Bob,13:00:00,1] are appended to the table. In [13:00:00,13:59:59], The calculated result is [Bob,14:00:00,1],[Liz,14:00:00,2]. The result is also appended to the result table, and the data in the previous window will not be updated. So the characteristic of window aggregation is that only the data belonging to the window is computed and the result is appended to the result table.

  • Similarities and differences between group aggregation and window aggregation
To compare Group aggregation Window aggregation
The output mode Output ahead of time, each piece of data calculated once Output by window trigger time
The output Output results per window Each key outputs N results
The output stream Append Stream Update Stream
State to clean up Clean up outdated data in a timely manner The state is going to grow indefinitely
The output is not required to support update operations Support update operations (Kudu,HBase,MySQL, etc.)

Update and append queries

The above two examples illustrate updated and appended queries, respectively. The first grouped aggregate case outputs results that UPDATE the previous results, that is, the result table contains INSERT and UPDATE operations.

The second example of window aggregation is simply to append results to a result table that contains only INSERT operations.

When a query produces an append-only table or an updated table, the differences are as follows:

  • A larger state needs to be maintained when the query produces an updated table (that is, it updates the previous output)
  • An append-only table differs from an updated table to a Stream

Table to stream conversion

Dynamic tables are continuously changed by INSER, UPDATE, and DELETE operations. When a dynamic Table is streamed or written out to an external storage system, the changed values need to be encoded. Flink Table API and SQL support three ways to encode the changed data:

  • Append-only stream

Dynamic tables are only modified by INSERT operations, and the changed data (new data) is inserted into the dynamic table rows

  • Retract stream

A retract stream contains two types of messages: Add messages and Retract messages. To retract a dynamic table, code INSERT changes as Add messages and DELETE changes as retract messages. For the changed data caused by the UPDATE operation, the old data (which needs to be updated) will be encoded as retract message and the new updated data will be encoded as Add messages, as shown in the following figure:

  • Upsert stream

Upsert streams have two types of messages: Upsert messages and DELETE messages. Dynamic tables that are converted to UPSERt streams require a unique primary key (possibly compound)key. Dynamic tables with unique primary keys encode the changes caused by INSERT and UPDATE operations as UPsert messages when converted to streams. Encode the change data caused by a DELETE operation as a DELETE Message. Compared with retract streams, upsert streams encode the change data caused by UPADTE operations using a single message, i.e., upsert message. For retract streams, the old data needs to be encoded as retract message first, and then the new data needs to be encoded as Add message. That is, Delete and Insert messages need to be encoded. Therefore, upSERt streams are more efficient. The details are shown in the figure below:

** Only Append and Retract streams are supported when converting a dynamic table to datastream. Append, Retract, and Upsert modes are supported when exporting dynamic tables to external systems.

summary

This paper mainly introduces the concept of dynamic table in Flink TableAPI&SQL. It first introduces the basic concepts of dynamic tables, then introduces the way to define tables on streams, and points out the similarities and differences between group aggregation and window aggregation. Finally, it introduces three modes of table-to-stream transformation and output to external systems.

The public account “Big Data Technology and Data Warehouse” replies to “information” to receive the big data data package