Both Flink chronotables and versioning tables record the data of ids at different times. In the new version, versioning tables are used instead of the chronotables.

Timing tables can only be defined on append-only streams, which do not support changelog input. And timing tables cannot be created using SQL DDL.

Temporal tables

Create a sequence table:

TemporalTableFunction rates = tEnv
    .from("currency_rates").
    .createTemporalTableFunction("update_time"."currency");
 
tEnv.registerFunction("rates", rates);  
Copy the code

Sequence table JOIN:

SELECT * FROM currency_rates;
update_time   currency   rate
============= =========  ====
09:00:00      Yen        102
09:00:00      Euro       114
09:00:00      USD        1
11:15:00      Euro       119
11:49:00      Pounds     108

SELECT * FROM orders;
order_time amount currency
========== ====== =========
10:15        2    Euro
10:30        1    USD
10:32       50    Yen
10:52        3    Euro
11:04        5    USD
Copy the code

The join process will obtain different rates from the timing table for calculation according to different times.

SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
Copy the code

Versioning table

Versioning tables are also time-dependent data that can be created based on either Append-only or Updates data. It must have a primary key and time field.

Create versioning tables

Created in Table source

CREATE TABLE products (
	product_id    STRING,
	product_name  STRING,
	price         DECIMAL(32.2),
	update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
	PRIMARY KEY (product_id) NOT ENFORCED,
	WATERMARK FOR update_time AS update_time
) WITH(...). ;Copy the code

The query to create

Currency_rates data:  (changelog kind) update_time currency rate ================ ============= ========= ==== +(INSERT) 09:00:00 Yen 102 +(INSERT) 09:00:00 Euro 114 +(INSERT) 09:00:00 USD 1 +(INSERT) 11:15:00 Euro 119 +(INSERT) 11:49:00 Pounds 108Copy the code
CREATE VIEW versioned_rates AS              
SELECT currency, rate, update_time              
  FROM (
      SELECT *.ROW_NUMBER(a)OVER (PARTITION BY currency ORDER BY update_time DESC) AS rownum 
      FROM currency_rates)
WHERE rownum = 1; 
Copy the code

PARTITION BY currency: will be used as the primary key of the resulting versioning table