In the Table API and SQL, Flink treats data as a Table, stream data as an infinitely appended Table, and batch data as a finite Table. Such infinitely appended tables are called dynamic tables.
Dynamic table query
Dynamic table query results can be divided into two types:
- After the new data comes in, all the historical data is counted and the result table is updated. This type of query requires more state to be maintained. This is the query when window handling is not set.
- When new data comes in, unify the updated parts and append the updated data to the results table
Time field & Watermarks
For infinitely growing dynamic table processing, if you do not set the window, if you do not set watermarks, then unlimited data will definitely be memory OOM. How to define watermarks in the Table API:
Define the Event time
Defined in table DDL
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- Define user_action_time as the Watermarks generated by the event time with a delay of 5 seconds, not the Watermarks width
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
Copy the code
The type of the time field can be TIMESTAMP and TIMESTAMP_LTZ.
TIMESTAMP
: is timestamp without timezoneTIMESTAMP_LTZ
: is timestamp with local timezone
- set the timezone
SET table.local-time-zone=UTC;
Copy the code
Defined in DataStream to Table
stream = inputStream.assignTimestampAndWatermarks(...) ; tEnv.fromDataStream(stream, $("user_name"),... , $("time").rowtime(),...) ;Copy the code
Defined in TableSource
Rewrite getRowtimeAttributeDescriptors (), rewrite the method to define the time the name of the field
Use watermarks to query
SQL:
select TUMBLE_START(time.INTERVAL '10' MINUTE),COUNT(DINSTINCT user_name)
from table_name
group by TUMBLE(time.INTERVAL '10' MINUTE)
Copy the code
Table API:
table.window(Tumble.over(lit(10).minutes()))
.on($("time"))
.as("myWindow")
Copy the code
Define the Processing Time
Defined in table DDL
CREATE TABLE user_actions (
user_name STRING,
data STRING,
-- Define user_action_time as processing time
user_action_time AS PROCTIME()
) WITH (
...
);
Copy the code
Defined in DataStream to Table
stream = inputStream.assignTimestampAndWatermarks(...) ; Table table = tEnv.fromDataStream(stream, $("user_name") and $("data") and $("user_action_time").proctime());
Copy the code
Defined in TableSource
Override getProctimeAttribute() to override this method to define the name of the time field
Use watermarks to query
SQL:
select TUMBLE_START(time.INTERVAL '10' MINUTE),COUNT(DINSTINCT user_name)
from table_name
group by TUMBLE(time.INTERVAL '10' MINUTE)
Copy the code
Table API:
table.window(Tumble.over(lit(10).minutes()))
.on($("time"))
.as("myWindow")
Copy the code
Window in the Table API
- TUMBLE(Scroll window) : TUMBLE(time, INTERVAL ‘1’ HOURS)
- HOP(sliding window) : HOP(time, INTERVAL ‘2’ HOURS, INTERVAL ‘1’ HOURS,)
- SESSION(SESSION window) : SESSION(time, INTERVAL ‘1’ MINUTE)