Dimension table is a concept in data warehouse, and dimension attribute in dimension table is the Angle of observing data. When constructing offline data warehouse, dimension table and fact table are usually associated to construct star model. In the real-time data store, there is also the concept of dimension tables and fact tables, where the fact table is usually stored in Kafka and the dimension table is usually stored in external devices (such as MySQL and HBase). For each stream data, an external dimension table data source can be associated to provide data associated queries for real-time calculations. The dimension table may change constantly. When the dimension table joins, specify the time when the record is associated with the snapshot of the dimension table. Note that dimension table JOIN in Flink SQL currently only supports association of dimension table snapshots at the current moment (processing time semantics), but does not support dimension table snapshots corresponding to fact table RowTime (event time semantics). You can learn from this article:
- How to create a table using Flink SQL
- How to define a Kafka data source table
- How to define the MySQL data source table
- What is Temporal Table Join
- Example of a dimension table JOIN
Flink SQL creates tables
Note: All operations in this article are performed in the Flink SQL CLI
Syntax for creating tables
CREATE TABLE [catalog_name.][db_name.]table_name ( { <column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) Column_name column_type [COMMENT column_comment] -- Define the computed_column_definition>: Column_name AS computed_column_expression [COMMENT column_comment] -- Define watermark <watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expressionCopy the code
explain
COMPUTED COLUMN
The computed column is a virtual column generated using column_name AS computed_column_expression. The computed column is not physically stored in the data source table. A computed column can be generated from a field, operator, and built-in function in the original data source table. For example, to define a calculated column (cost) of consumption, you can calculate this using price * quantity of the table.
Computed columns are often used to define time attributes (see Flink Table API&SQL Programming Guide for Time Attributes (3), and you can define processing time attributes through the PROCTIME() function, syntax proc AS PROCTIME(). In addition, the computed column can be used to extract the event time column, since the original event time may not be of type TIMESTAMP(3) or exist in a JSON string.
Scream tips:
1. Define the column on the source table, which is calculated after the data source is read. The column must be followed by the SELECT statement.
2. Computed columns cannot be inserted into data by INSERT statements. In INSERT statements, only the actual target table schema can be included, not computed columns
The water
The water line defines the event time attribute of the table, and its syntax is:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
Copy the code
Rowtime_column_name indicates an existing event time field in the table. It is worth noting that the event time field must be of TIMESTAMP(3) type, that is, in the form of YYYY-MM-DD HH: MM: SS. If the data type is not in this form, you need to define computed columns for conversion.
Watermark_strategy_expression defines the waterline generation strategy. The return data type of this expression must be TIMESTAMP(3).
Flink provides a number of common water-line generation strategies:
-
Strictly monotonically increasing water level: syntax is
WATERMARK FOR rowtime_column AS rowtime_column Copy the code
That is to use the time stamp as the water line directly
-
Increasing water mark: the syntax is
WATERMARK FOR rowtime_column AS rowtime_column-interval '0.001' SECONDCopy the code
-
Out-of-order water mark: the syntax is
WATERMARK FOR rowtime_column AS rowtime_column-interval 'string' timeUnit WATERMARK FOR rowtime_column AS rowtime_column-interval '5' SECOND is allowedCopy the code
partition
Create a partition table based on specific fields. Each partition will correspond to a file path
WITH options
To create a Table source or Table sink, you need to specify the attributes of the Table. The attributes are configured in the form of keys and values. For details, see the corresponding Connector
Scream tips:
Note: The table name specified when creating a table has three forms:
(1) catalog_name. Db_name. Table_name
(2) the db_name table_name
(3) the table_name
In the first form, the tables are registered to a catalog named ‘catalog_name’ and metadata of a database named ‘db_name’.
In the second form, the tables are registered in the catalog of the current execution environment and in the metadata of the database named ‘DB_name’;
In the third form, the tables are registered with the metadata of the catalog and database of the current execution environment
Define the Kafka data table
Flink SQL is used to create a kafka data source table. The syntax of kafka data source table is as follows:
CREATE TABLE MyKafkaTable ( ... ) WITH ('connector.type' = 'kafka', -- connection type' connector.version' = '0.11',-- Mandatory: Optional kafka version: 0.8/0.9/0.10/0.11 / universal 'the topic' = 'topic_name', - will choose: The topic name 'properties. The zookeeper. Connect' = 'localhost: 2181', - will choose: Zk connection address 'connector. The properties. The bootstrap. The servers' =' localhost: 9092 ', - will choose: The Kafka connection address 'properties. Group. Id' = 'testGroup', - optional: Consumer group -- Optional: offset, Later-offset/later-offset/later-offset/later-offset/later-offset/later-offset/later-offset/later-offset ' 'connector. Specific -offsets' = 'partition:0,offset:42; partition:1,offset:300', 'connector.sink-partitioner' = '... ', -- optional: sink partition, Fixed /round-robin/custom -- optional: When the custom partition device, specify the class name of the partition is' the sink - partitioner - class '=' org. Mycompany. MyPartitioner ', 'format. Type' = '... ', -- Mandatory: CSV /json/avro -- update-mode, append/retract/upsert 'update-mode' = 'append',)Copy the code
Scream tips:
- Specify a specific offset location: the default is to start consuming from the offset submitted by the current consumer group
- Sink partition: The default is to write data to as many partitions as possible (each Sink parallelism instance writes data to only one partition), or you can use your own partition policy. When using a round-robin partitioner, partitioning imbalances can be avoided but can result in a large number of network connections between Flink instances and Kafka Brokers
- Ensure consistency: The default sink semantics are at-least-once
- Kafka 0.10+ is timestamp: Starting with Kafka0.10, Kafka messages carry a timestamp as metadata of the message, indicating the time when the record was written to the Kafka topic. This timestamp can be used as an event time attribute.
- Kafka 0.11+ version: Flink supports the universal version as Kafka connector starting from 1.7, compatible with Kafka versions later than 0.11
Define MySQL data tables
CREATE TABLE MySQLTable ( ... ) The WITH (' type '=' JDBC ', - choice: JDBC means' the url '=' JDBC: mysql: / / localhost: 3306 / flink - test ', - will choose: JDBC URL 'connector.table' = 'jdbc_table_name', -- Mandatory: table name -- optional: 'connector.driver' = 'com.mysql.jdbc. driver', 'connector.username' = 'name', -- Optional: Database username 'connector.password' = 'password',-- optional: Database password - optional, will enter the name of a field partition. 'connector. Read. Partition. The column' = 'column_name', optional, The number of partitions. 'read. Partition. Num' = '50', optional, the minimum value of the first partition. 'connector. Read. Partition. The lower - bound' = '500', optional, The last partition maximum 'connector. Read. Partition. The upper - bound' = '1000', optional, extract data rows at a time, the default is 0, 'connector.read. Fetch -size' = '100', -- Optional, lookup the maximum number of rows in the cache. The old data will be cleared 'lookup. Cache. The Max - rows' =' 5000 ', optional, lookup cache for the maximum time, more than the time the old data will be out of date, Pay attention to the cache. The Max - rows and cache. TTL must be configured at the same time 'connector. Lookup. Cache. TTL' = 's', optional, 'connector.lookup. Max-retries' = '3', -- Optional. Maximum number of flush lines that can be written. Brush will trigger data 'connector. Write. Flush. The Max - rows' =' 5000 ', optional, flush the time interval of data, more than the time, through an asynchronous thread flush data, The default is 0 s' connector. Write. Flush. The interval '=' 2 s, optional, write data failed maximum retries' connector. Write. Max - retries' = '3')Copy the code
Temporal Table Join
Use the syntax
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
Copy the code
Note: Currently, only INNER JOIN and LEFT JOIN are supported. FOR SYSTEM_TIME AS OF, where table1.proctime represents the proctime attribute (calculation column) OF table1. Proctime FOR SYSTEM_TIME AS OF table1.proctime indicates that when the records OF the left table join the right dimension table, only the snapshot data corresponding to the current processing time dimension table is matched.
The sample
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
Copy the code
Directions for use
- Only Blink Planner is supported
- Only SQL is supported. Currently, the Table API is not supported
- Currently, temporal table join based on Event time is not supported
- The dimension table may change constantly. After the JOIN behavior occurs, the data in the dimension table is changed (new, updated or deleted), and the associated dimension table data will not be changed synchronously
- The dimension table and dimension table cannot be joined
- A dimension table must specify a primary key. When a dimension table is joined, the condition ON must contain the equivalent conditions of all primary keys
Dimension table Join example
background
Kafka has a user behavior data, including PV, buy, CART, faV behavior; MySQL has a dimension table data for provinces and regions. Now JOIN the two tables to count the number of purchase behaviors in each region.
steps
MySQL > create dimension table data source
CREATE TABLE dim_province (province_id BIGINT, -- region_name VARCHAR -- region_name VARCHAR) WITH ('connector.type' = 'JDBC ', 'the url' = 'JDBC: mysql: / / 192.168.10.203:3306 / mydw', 'the table' = 'dim_province', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '123qwe', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );Copy the code
The fact table is stored in Kafka, and the data is the user click behavior. The format is JSON. The specific data sample is as follows:
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
Copy the code
Create kafka data source table as follows:
CREATE TABLE user_behavior (user_id BIGINT, -- user ID item_id BIGINT, -- commodity ID cat_id BIGINT, -- category ID Action STRING, -- user behavior type INT, -- user's province ts BIGINT, -- user behavior timestamp proctime as proctime (), EventTime AS TO_TIMESTAMP(ts, 'YYYY-MM-DD HH: MM :ss')), WATERMARK FOR eventTime as eventtime-interval '5' SECOND -- define WATERMARK FOR eventTime WITH ('connector.type' = 'connector ', -- use connector 'connector. Version '= 'universal', -- kafka version, Connector.topic = 'user_behavior', -- kafka theme 'connector.startup-mode' = 'advit-offset ', -- offset, Read from the starting offset 'connector. The properties. Group. Id' = 'group1', - consumer groups' connector. The properties. The zookeeper. Connect '=' KMS - "81 KMS 3:21-81, 81 KMS - 4:21 ', -- the zookeeper address 'connector. The properties. The bootstrap. The servers' =' KMS - 2:9 092 KMS - "092, 092 KMS - for these ', -- kafka broker address 'format.type' = 'json' -- data source format is JSON);Copy the code
Create MySQL result table representing region sales
CREATE TABLE region_sales_sink (region_NAME STRING, -- region_NAME buy_cnT BIGINT -- sales) WITH ('connector.type' = 'JDBC ', 'the url' = 'JDBC: mysql: / / 192.168.10.203:3306 / mydw', 'the table' = 'top_region', 'connector.driver' = 'com.mysql.jdbc.driver ', 'connector.username' = 'root', 'connector.password' = '123qwe', 'connector.write.flush.interval' = '1s' );Copy the code
Join user behavior data with provincial dimension table data
CREATE VIEW user_behavior_detail AS
SELECT
u.user_id,
u.item_id,
u.cat_id,
u.action,
p.province_name,
p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province = p.province_id;
Copy the code
Calculate the sales volume of the region and write the calculation result to MySQL
INSERT INTO region_sales_sink
SELECT
region_name,
COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE action = 'buy'
GROUP BY region_name;
Copy the code
Results view:
Flink SQL> select * from region_sales_sink; -- Check in Flink SQL CLICopy the code
mysql> select * from top_region; -- Check the MySQL databaseCopy the code
Refer to the article: segmentfault.com/a/119000002…