Both Flink chronotables and versioning tables record the data of ids at different times. In the new version, versioning tables are used instead of the chronotables.
Timing tables can only be defined on append-only streams, which do not support changelog input. And timing tables cannot be created using SQL DDL.
Temporal tables
Create a sequence table:
TemporalTableFunction rates = tEnv
.from("currency_rates").
.createTemporalTableFunction("update_time"."currency");
tEnv.registerFunction("rates", rates);
Copy the code
Sequence table JOIN:
SELECT * FROM currency_rates;
update_time currency rate
============= ========= ====
09:00:00 Yen 102
09:00:00 Euro 114
09:00:00 USD 1
11:15:00 Euro 119
11:49:00 Pounds 108
SELECT * FROM orders;
order_time amount currency
========== ====== =========
10:15 2 Euro
10:30 1 USD
10:32 50 Yen
10:52 3 Euro
11:04 5 USD
Copy the code
The join process will obtain different rates from the timing table for calculation according to different times.
SELECT
SUM(amount * rate) AS amount
FROM
orders,
LATERAL TABLE (rates(order_time))
WHERE
rates.currency = orders.currency
Copy the code
Versioning table
Versioning tables are also time-dependent data that can be created based on either Append-only or Updates data. It must have a primary key and time field.
Create versioning tables
Created in Table source
CREATE TABLE products (
product_id STRING,
product_name STRING,
price DECIMAL(32.2),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY (product_id) NOT ENFORCED,
WATERMARK FOR update_time AS update_time
) WITH(...). ;Copy the code
The query to create
Currency_rates data: (changelog kind) update_time currency rate ================ ============= ========= ==== +(INSERT) 09:00:00 Yen 102 +(INSERT) 09:00:00 Euro 114 +(INSERT) 09:00:00 USD 1 +(INSERT) 11:15:00 Euro 119 +(INSERT) 11:49:00 Pounds 108Copy the code
CREATE VIEW versioned_rates AS
SELECT currency, rate, update_time
FROM (
SELECT *.ROW_NUMBER(a)OVER (PARTITION BY currency ORDER BY update_time DESC) AS rownum
FROM currency_rates)
WHERE rownum = 1;
Copy the code
PARTITION BY currency: will be used as the primary key of the resulting versioning table