Author: Yu Ao

This article mainly introduces the practice of JOIN in streaming scenarios. It is well known that joins are frequently used in data analysis using SQL. In offline scenarios, join data sets are bounded and can be cached for query. Nested Loop, Hash JOIN, Sort Merge JOIN and other multi-table joins can be performed. However, in the real-time scenario, data on both sides of join are borderless data flows, so the cache data set is under great pressure of storage and query for long-duration jobs. In addition, the arrival time of dual streams may be inconsistent, resulting in inaccuracy of join calculation results. Therefore, Flink SQL provides multiple join methods to help users deal with various join scenarios.

This article mainly introduces the practical application of regular Join, interval Join, and Temproal table Join. It mainly includes the following parts:

  • Data preparation
  • Flink Regular join of SQL join
  • Flink Interval join of SQL join
  • Flink Temproal TABLE JOIN for SQL Join
  • conclusion

01 Data Preparation

Flink SQL provides a kafka /JDBC connector for most of the company’s real-time data. The material data is stored in a relational database like MySQL. Flink Kafka Table Flink Kafka Table Flink Kafka Table Flink Kafka Table Flink Kafka Table Flink Kafka Table

  • Register Flink Kafka Table as two data flows that need to be joined; For clickstream, we defined the Process Time attribute for temproal table join, and Event Time and Watermark for dual-stream join. For the exposure stream, we define Event Time and Watermark for a dual-stream join.
DROP TABLE IF EXISTS flink_rtdw.demo.adsdw_dwd_max_click_mobileapp; CREATE TABLE flink_rtdw.demo.adsdw_dwd_max_click_mobileapp ( ... Publisher_adspace_adspaceId INT COMMENT 'adspaceID ',... Audience_behavior_click_creative_impressionId BIGINT COMMENT 'ImpressionId of advertisement creation clicked by audience user ', Audience_behavior_click_timestamp BIGINT COMMENT 'Target user click advertisement timestamp (ms)',... procTime AS PROCTIME(), ets AS TO_TIMESTAMP(FROM_UNIXTIME(audience_behavior_click_timestamp / 1000)), WATERMARK FOR ets AS ets - INTERVAL '5' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'adsdw.dwd.max.click.mobileapp', 'properties.group.id' = 'adsdw.dwd.max.click.mobileapp_group', 'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-administrator" password="kafka-administrator-password"; ', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'SCRAM-SHA-256', 'avro-confluent.schema-registry.url' = 'http://schema.registry.url:8081', 'avro-confluent.schema-registry.subject' = 'adsdw.dwd.max.click.mobileapp-value', 'format' = 'avro-confluent' );Copy the code
  • Register Flink Mysql Table as dimension Table
DROP TABLE IF EXISTS flink_rtdw.demo.adsdw_dwd_max_show_mobileapp; CREATE TABLE flink_rtdw.demo.adsdw_dwd_max_show_mobileapp ( ... Audience_behavior_watch_creative_impressionId BIGINT COMMENT 'ImpressionId of the creative AD viewed by the audience ', Audience_behavior_watch_timestamp BIGINT COMMENT 'Target user views the advertisement in milliseconds ',... ets AS TO_TIMESTAMP(FROM_UNIXTIME(audience_behavior_watch_timestamp / 1000)), WATERMARK FOR ets AS ets - INTERVAL '5' MINUTE ) WITH ( 'connector' = 'kafka', 'topic' = 'adsdw.dwd.max.show.mobileapp', 'properties.group.id' = 'adsdw.dwd.max.show.mobileapp_group', 'properties.bootstrap.servers' = 'broker1:9092,broker2:9092,broker3:9092', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-administrator" password="kafka-administrator-password"; ', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'SCRAM-SHA-256', 'avro-confluent.schema-registry.url' = 'http://schema.registry.url:8081', 'avro-confluent.schema-registry.subject' = 'adsdw.dwd.max.show.mobileapp-value', 'format' = 'avro-confluent' );Copy the code

02 Flink Regular join of SQL join

Regular join is the most common type of join. Regular join does not support time window or time attribute. Any change of data flow on either side will be visible and directly affect the whole join result. If a new record is added to one side of the stream, it will merge all past and future data from the other side. Since regular joins have no cull policy, this will affect the results of the latest output. Because historical data is not cleaned up, regular Join supports any update operation to the data stream. Regular Join is suitable for offline scenarios and scenarios with a small amount of data.

  • Use the syntax
SELECT columns
FROM t1  [AS <alias1>]
[LEFT/INNER/FULL OUTER] JOIN t2
ON t1.column1 = t2.key-name1
Copy the code
  • Application scenario: Offline scenario and small data volume scenario

  • Following the data in section 1, we will make a simple regular join that regualr join the click stream and the exposure stream according to the impressionId and output the AD space and impressionId. The SQL statement is as follows:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId as click_impressionId,
       adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId as show_impressionId
from adsdw_dwd_max_click_mobileapp  
inner join adsdw_dwd_max_show_mobileapp 
on adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId = adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId;
Copy the code
  • The jobs submitted to the Flink cluster and the output are as follows:

03 Flink Interval Join of SQL Join

In contrast to regular join, interval join uses the window to set a join time limit between the two input tables. Data outside the time limit is invisible to the join and can be cleaned up. This will correct the regular join because it does not remove the error caused by the data policy and requires a lot of resources. However, when interval Join is used, Time attribute fields need to be defined, which can be Processing Time calculated or Event Time extracted according to the data itself. If Processing Time is defined, the Flink framework periodically cleans data according to the Time window divided by the system. If Event Time is defined, the Flink framework allocates the Event Time window and cleans up the data based on the watermark set. In the previous data preparation, we extracted the practice time attribute fields based on the click stream and exposure stream, and set up a watermark that allowed 5 minutes of out-of-order. Interval Join supports inner,left outer, right outer, and full outer joins. Therefore, interval join only needs to cache data within the time boundary, which occupies less storage space and computs more accurate real-time join results.

  • Use the syntax
SELECT columns FROM T1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN T2 ON t1.column1 = t2.key-name1 AND t1.timestamp  BETWEEN t2.timestamp AND BETWEEN t2.timestamp + + INTERVAL '10' MINUTE;Copy the code
SELECT columns FROM T1 [AS <alias1>] [LEFT/INNER/FULL OUTER] JOIN T2 ON t1.column1 = t2.key-name1 AND t2.timestamp <= t1.timestamp and t1.timestamp <= t2.timestamp + + INTERVAL '10' MINUTE;Copy the code
  • How do I set boundary conditions
Right. timestamp ∈ [left.timestamp + lowerBound, left.timestamp + upperBound]Copy the code
  • Application scenario: Dual-stream JOIN scenario

  • According to the data in section 1, we will make an Inertval JOIN (in the form of between and). The click stream and exposure stream will be joined according to the impressionId interval. The boundary condition is that the click stream is between the occurrence of exposure stream and the occurrence of exposure stream 10 minutes after the occurrence of exposure stream. Output the AD space and impressionId with the following SQL statement:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId, adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId as click_impressionId, adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId as show_impressionId from adsdw_dwd_max_click_mobileapp inner join adsdw_dwd_max_show_mobileapp on adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId = adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId and adsdw_dwd_max_click_mobileapp.ets between  adsdw_dwd_max_show_mobileapp.ets and adsdw_dwd_max_show_mobileapp.ets + INTERVAL '10' MINUTE;Copy the code

The jobs submitted to the Flink cluster and the output are as follows:

  • Interval join there are several ways to implement Interval join. According to the data in section 1, we use <= to implement Interval join. Again, do the same logic. Interval Join click stream and Exposure stream according to its impressionId. The boundary condition is that the click stream is between the occurrence of exposure stream and the occurrence of exposure stream 10 minutes after the occurrence of exposure stream, and the advertising position and impressionId are output. The specific SQL statement is as follows:
select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId as click_impressionId,
       adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId as show_impressionId
from adsdw_dwd_max_click_mobileapp  
inner join adsdw_dwd_max_show_mobileapp
on adsdw_dwd_max_click_mobileapp.audience_behavior_click_creative_impressionId = adsdw_dwd_max_show_mobileapp.audience_behavior_watch_creative_impressionId and 
   adsdw_dwd_max_show_mobileapp.ets <= adsdw_dwd_max_click_mobileapp.ets and adsdw_dwd_max_click_mobileapp.ets <= adsdw_dwd_max_show_mobileapp.ets + INTERVAL '10' MINUTE;
Copy the code
  • The jobs submitted to the Flink cluster and the output are as follows:

04 Flink Temproal TABLE join for SQL Join

In the previous section, interval Join provides a strategy to eliminate data, so as to solve resource problems and make calculation more accurate. This is on the premise that: The two streams of Join need time attribute, and the lower bound of time needs to be defined to facilitate data elimination. Obviously, this scenario is not suitable for dimension table joins because dimension tables have no time bounds. For this scenario, Flink provides temproal Table Joins to override such scenarios.

In regular join and interval Join, tables on both sides of a join are equal. Any update of a table will be matched with another historical record. The update of a Temproal table is invisible to the records of the other table before the time node. In Temproal table Join, one of the more obvious use scenarios is to click the stream to join the dimension table of advertising positions and introduce the Chinese name of advertising positions.

  • Use the syntax
SELECT columns
FROM t1  [AS <alias1>]
[LEFT] JOIN t2 FOR SYSTEM_TIME AS OF t1.proctime [AS <alias2>]
ON t1.column1 = t2.key-name1
Copy the code
  • Usage scenario: Dimension table Join scenario

According to the data in section 1, we will make a Temproal table JOIN, and perform temproal rable join on click stream and AD space dimension table according to AD space Id to output AD space and Chinese name of AD space. The specific SQL statement is as follows:

select adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId as publisher_adspace_adspaceId,
       mysql_dim_table.name as publisher_adspace_name
from adsdw_dwd_max_click_mobileapp
join mysql_dim_table FOR SYSTEM_TIME AS OF adsdw_dwd_max_click_mobileapp.procTime
on adsdw_dwd_max_click_mobileapp.publisher_adspace_adspaceId = mysql_dim_table.ID;
Copy the code
  • The jobs submitted to the Flink cluster and the output are as follows:

05 summary

The three join modes of Flink SQL are briefly introduced above. For flow join, interval Join is recommended for dual-flow join, and Temproal table join is recommended for flow and dimension table join.

Author’s brief introduction

Yu Ao, senior engineer of 360 Data Development, currently focuses on the construction and platformization of real-time data warehouse based on Flink. Rich experience in ETL and warehouse development for Flink, Kafka, Hive, Spark, etc.