Traditional database SQL is not designed with streaming data in mind. As a result, there is little conceptual difference between traditional database SQL processing and stream processing.

This paper is mainly to say about the idea of Flink dynamic table. Mainly materialized views that can be analogous to traditional databases. Translation (website) to the original address: ci.apache.org/projects/fl…

Relational queries on data flows

The concept of traditional database SQL and real-time SQL is the same, but the processing is very different. Here are some differences:

Traditional database SQL processing Real-time stream processing
Table data in a traditional database is bounded Real-time data is limitless
In batch data query is required to obtain full data Unable to obtain full data and must wait for new data input
It stops when it’s finished Constantly updating its result table with input data, never stopping

Despite these differences, it is not impossible to process flows using relational queries and SQL. Advanced relational database systems provide a feature called “materialized view”. Materialized views are defined as SQL queries, just like regular virtual views.

In contrast to virtual views, materialized views cache the results of a query, making it unnecessary to perform a query when accessing the view. A common challenge with caching is to avoid caching stale results. Materialized views become obsolete when they modify the base table that defines the query. Eager View Maintenance is a technique for updating the instantiated View as soon as the base table is updated.

The connection between Eager View Maintenance and SQL queries on the stream becomes obvious if we consider the following:

  • Database tables are the result of streams of INSERT, UPDATE, and DELETEDML statements, often referred to as UPDATE log flows.
  • Materialized views are defined as SQL queries. To update the view, the query needs to continuously process the change log flow from the view source table.
  • Materialized views are the result of a streaming SQL query.

With these points in mind, we introduce the following concepts of dynamic tables below.

Dynamic tables and continuous queries

Dynamic tables are a core concept of Flink’s Table API and SQL support for streaming data. Dynamic tables change over time compared to static tables, but you can query dynamic tables just like static tables, except that you need to generate continuous queries. Continuous queries are never terminated and dynamic tables are generated as result tables. A query constantly updates its (dynamic) result table to reflect changes to its (dynamic) input table. Finally, a continuous query on a dynamic table is very similar to a query that defines materialized views.

It is worth noting that the result of a continuous query is always semantically equivalent to the result of the same query executed in a batch on a snapshot of the input table.

The following diagram shows the relationship between streams, dynamic tables, and continuous queries:

  1. The data flow is transformed into dynamic tables
  2. Continuous queries are executed on the resulting dynamic table, producing a dynamic result table.
  3. The resulting dynamic table is again transformed into a data flow.

Note: Dynamic tables are first and foremost a logical concept. Dynamic tables are not necessarily (fully) implemented during query execution.

In the following sections, dynamic tables and continuous queries are explained in terms of the click event flow in schema as follows.

[
  user:  VARCHAR,   // the name of the user
  cTime: TIMESTAMP, // the time when the URL was accessed
  url:   VARCHAR    // the URL that was accessed by the user
]
Copy the code

Define tables on the flow

To use a traditional relational query processing flow, it must be converted to a Table. Conceptually, each new record added to a stream is interpreted as an Insert into the result table. Ultimately, you can understand that you are building a table from an insert-only Changelog flow.

The figure below visualizes how the click event stream (left) is transformed into a table (right). The result table continues to grow as more clickstream records are inserted.

Note: The tables converted on the stream are not materialized internally.

Continuous query

Performs a continuous query on a dynamic table and generates a new dynamic table as a result. In contrast to batch queries, continuous queries never terminate and update their result tables based on updates to their input tables. At any point in time, the result of a continuous query is semantically equivalent to the result of executing the same query in batch mode on a snapshot of the input table.

In the following, we show two sample queries for the CLICKS table defined on the click event flow.

The first query is a simple group-by COUNT aggregate query. The clicks table is grouped by user and the number of visits to the URL is counted. The following figure shows how queries from the CLICKS table are executed during data augmentation.After starting the query, click the table (left) to make it empty. When the first row is inserted into the CLICKS table, the query begins to evaluate the result table. After inserting the first row [Mary,./home], the result table (right, top) consists of a single row [Mary, 1]. When the second row [Bob,./ CART] inserts into the CLICKS table, the query updates the result table and inserts a new row [Bob, 1]. The third line [Mary,./prod? Id =1] produces an update to the computed result row, updating [Mary, 1] to [Mary, 2]. Finally, when the fourth row is added to the CLICKS table, the query inserts the third row [Liz, 1] into the results table.

The second query is similar to the first, but adds a one-hour value in addition to the user group for the CLICKS table before counting the number of urlsRolling windowTime-based calculations (for example, Windows based on special time attributes). Again, the figure shows the inputs and outputs at different points in time to visualize the changing nature of the dynamic table.As above, the number of clicks on the input table is shown on the left. This query computes the results and updates the results table on an hourly basis. The clicks table contains four rows with time stamps between 12:00:00 and 12:59:59. The query computes two result rows (one for each user) from that input and appends them to the result table. For the next window, between 13:00:00 and 13:59:59, the CLICKS table contains three rows, which will cause two more rows to be attached to the results table. The results table is updated as more rows are added to the clicks over time.

Update and append queries

Although the two sample queries look very similar (both calculate the grouping count aggregation), the internal logic is quite different:

  • The first query updates the previously issued results, that is, the change log flow for the result table contains INSERT and UPDATE changes.
  • The second query appends only to the result table, that is, the change log flow of the result table contains only INSERT changes.

There are some differences in whether a query generates an append-only table or an UPDATE table:

  • Queries that produce UPDATE changes usually have to maintain more state.
  • Converting an Append-only table to a stream is different from converting an UPDATE table to a stream. Table to Stream Conversion

Query limit

Many, but not all, semantically valid queries can be evaluated as continuous queries in a flow. Some queries are too expensive to compute due to the size of the state to maintain or to compute updates.

  • State size: Continuous queries are executed on an unbounded stream and should typically run for weeks or months, or even 7*24 hours. Therefore, the total amount of data processed by continuous queries can be very large. To update previously generated results, you may need to maintain all of the output rows. For example, the first example query needs to store the URL count for each user so that it can increment the count and issue a new result when the input table receives a new row. If only registered users are counted, the count to maintain may not be too high. However, if an unregistered user is assigned a unique user name, the number of counts to maintain will grow over time and may eventually cause the query to fail.

    SELECT user.COUNT(url)
    FROM clicks
    GROUP BY user;
    Copy the code
  • Computed updates: Sometimes even when only a single input record is added or updated, some queries require recalculation and updating of most of the issued result rows. Obviously, such a query is not suitable for execution as a continuous query. The following query is an example that calculates a RANK for each user based on the last click time. Once the CLICKS table receives a new row, the user’s lastAction is updated and the new ranking must be calculated. However, because two rows cannot have the same ranking, all lower-ranked rows need to be updated as well.

    SELECT user.RANK(a)OVER (ORDER BY lastLogin)
    FROM (
      SELECT user.MAX(cTime) AS lastAction FROM clicks GROUP BY user
    );
    Copy the code

The Query Configuration page discusses the parameters used to control the execution of continuous queries. Certain parameters can be used to balance the size of the maintenance state to improve the accuracy of the results.

The Table is converted to a stream

Dynamic tables can be continuously modified with INSERT, UPDATE, and DELETE changes just like regular database tables. It could be a continuously updated table with one row, an insert-only table with no UPDATE and DELETE modifications, or anything in between.

These changes need to be coded when a dynamic table is converted to a stream or written to an external system. Flink’s Table API and SQL support three ways to encode changes to dynamic tables:

  • Append-only stream: If the dynamic table’s change operation is just an INSERT, then to become stream is simply to send the inserted row.

  • Retract stream Retract stream is a stream with two message types: add message and undo message. Converting a dynamic table into a retraction flow is accomplished by encoding INSERT changes as an add message, DELETE changes as an undo message, UPDATE as a rollback message for previous rows and an addition message for new rows. The following diagram visualizes the dynamic table to the withdrawal stream:

Upsert flow:Upsert flows are flows with two types of messages, Upsert messages and Delete messages. A dynamic table converted to a UPSERT stream requires a (possibly composite) unique key. Transform a dynamic table with a unique key into a flow by encoding INSERT and UPDATE changes as UPSERT messages and DELETE changes as DELETE messages. The stream operator needs to know the unique key attribute to process the message correctly. The main difference from the rollback stream is that UPDATE changes are encoded using a single message, so they are more efficient. The following figure shows the dynamic table to upSERT flow conversion.

The API for converting dynamic tables to DataStream is discussed on the Common Concepts page. Note that only append-only stream and Retract Stream streams are supported when converting dynamic tables to DataStream. The TableSink interface for sending dynamic tables to external systems is discussed on the TableSources and TableSinks pages.