1. Introductory
After reading so many technical articles, can you understand what the author wants you to learn after reading the article?
Big data sheep said__“Will make you understand
1. The blogger will clarify what help the blogger expects this article to bring to the friends, so that the friends can intuitively understand the blogger’s mind
2. Bloggers will start with actual application scenarios and cases, not just a simple accumulation of knowledge points
3. The blogger will analyze the principle of the important knowledge points, so that the friends can understand them in a simple way
Enter the main text.
Source code public number background reply 1.13.2 SQL JOIN fantastic parsing road access.
The following is the table of contents, also corresponding to the conclusion of this article. You can read the conclusion quickly to understand what this article can help you:
-
Introduction to background and application scenarios: Join is the most common scenario in offline data warehouse, and it is inevitable to be missing in real-time data warehouse. Flink SQL provides rich join modes (6 kinds are summarized: Regular Join, dimension table Join, temporal Join, interval Join, array flatting, table function function) provide powerful backing for us to meet our needs
-
To introduce flink SQL Join solution, take an exposure log left JOIN click log as a case
-
Flink SQL Join solution Retract can be found in left join, right join, and full join, so you should fully understand the operation mechanism of regular join before using it. Avoid the problem of sending heavy or excessive data.
-
Regular join retract problems are mainly introduced in this article. Interval Join can be used to avoid retract problems and meet the actual case requirements in point 2.
2. Background and application scenarios
In our daily scenarios, one of the most widely used operations must have a place for joins, for example
-
To calculate CTR of exposure data and click data, join association is required by unique ID
-
Fact data correlates dimension data to obtain dimension, which in turn calculates dimension metrics
The above scenario, in the wide application of offline warehouse is not much to say.
So how do you correlate real-time streams?
Flink SQL gives us four powerful associations to help us achieve flow associations in streaming scenarios. As shown in the screenshot on the official website below:
join
-
Regular join: left join, right join, full join, inner join
-
Dimension table lookup Join: Dimension table association
-
Temporal Join: Snapshot table join
-
Interval Join: Join of two flows within a specified period of time
-
Array blast: columns change rows
-
Table function join: Use a table function to create a custom join (similar to a column join or a dimension table join).
In the real-time data store, regular join, interval Join and the combination of the two types of join are most commonly used. So this article mainly introduces these two kinds (too long you may not want to see, so the following article will be concise, short as the goal).
1. 3. Smart refrigerator
Let’s start with a practical example of what the output value might look like in a specific input value scenario.
Scenario: The common exposure log stream (show_log) is delivered based on the log_id associated with click_log.
A wave of input data:
Exposure data:
log_id | timestamp | show_params |
---|---|---|
1 | The 2021-11-01 00:01:03 | show_params |
2 | The 2021-11-01 00:03:00 | show_params2 |
3 | The 2021-11-01 00:05:00 | show_params3 |
Click data:
log_id | timestamp | click_params |
---|---|---|
1 | The 2021-11-01 00:01:53 | click_params |
2 | The 2021-11-01 00:02:01 | click_params2 |
Expected output data is as follows:
log_id | timestamp | show_params | click_params |
---|---|---|---|
1 | The 2021-11-01 00:01:00 | show_params | click_params |
2 | The 2021-11-01 00:01:00 | show_params2 | click_params2 |
3 | The 2021-11-01 00:02:00 | show_params3 | null |
If you are familiar with offline HIVE SQL, you may write the above SQL in 10 seconds
INSERT INTO sink_table
SELECT
show_log.log_id as log_id,
show_log.timestamp as timestamp,
show_log.show_params as show_params,
click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;
Copy the code
So let’s look at the above requirements and what do we need to do if we want to implement it in Flink SQL?
Although flink SQL does not provide the capability of left Join, unexpected problems may occur when it is used in practice. More on this in the next section.
4.flink sql join
4.1. Flink SQL
As in the above case, let’s actually run through the results:
INSERT INTO sink_table
SELECT
show_log.log_id as log_id,
show_log.timestamp as timestamp,
show_log.show_params as show_params,
click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;
Copy the code
Flink Web UI operator diagram is as follows:
flink web ui
The results are as follows:
+[1 | 2021-11-01 00:01:03 | show_params | null] -[1 | 2021-11-01 00:01:03 | show_params | null] +[1 | 2021-11-01 00:01:03 | show_params | click_params] +[2 | 2021-11-01 00:03:00 | show_params | click_params] +[3 | 2021-11-01 00:05:00 | show_params | null]Copy the code
The result shows that the output data is + and -, indicating that the output data is a retract stream. Analysis found that reason is, because the first show_log before click_log arrived, so I direct send + [1 | 2021-11-01 00:01:03 | show_params | null], behind click_log arrived, Will be the last time was not associated with show_log withdraw, and then will be associated with the + [1 | 2021-11-01 00:01:03 | show_params | click_params] issued.
Retract streams cause too much data to be written to Kafka, which is unacceptable. The desired result should be an Append data stream.
Why does left Join have this problem? Let’s start with the principle of Left Join.
To locate the specific implementation source code. So let’s look at everything.
transformations
You can see the left join specific operator is org. Apache. Flink. Table. The runtime. Operators. Join. Stream. StreamingJoinOperator.
The core logic is centered on the processElement method. In addition, the source code for processElement processing logic has detailed comments, as shown in the following figure.
StreamingJoinOperator#processElement
Comments look logically complex. Left Join, inner Join, right Join, full Join
4.2. Left to join
Show_log (left) left join click_log (right)
-
First of all, if the condition in join XXX on is equality, it means that the join is carried out under the same key. The key of the join is show_log.log_id and click_log.log_id, and the data with the same key will be sent to a concurrent processing. If the condition in join XXX on is inequality, the source operators of the two streams will deliver data to the join operator according to the partition policy of global, and the confluence of the join operator will be set to 1. All data will be sent to this one concurrency for processing.
-
With the same key, when show_log receives data, if click_log has data: [+ (show_log, click_log)] is traversed with all the click_log data and show_log is saved to the state of the left table (for subsequent joins).
-
With the same key, when show_log receives data, if click_log does not receive data: [+ (show_log, null)] and save show_log to the state of the left table (for later joins).
-
With the same key, when click_log receives data, show_log associates all data in show_log with click_log. [- (show_log, null)] and [+ (show_log, null)] before output, if the associated show_log has not been associated with click_log before ([+ (show_log, null)]). Click_log)], retracts the intermediate show_log result that is not associated with click_log data, delivers the latest result that is currently associated with click_log, and saves click_log to the state of the right table for subsequent association with the left table. This explains why the output stream is a retract stream.
-
With the same key, when click_log receives data, if show_log does not: Save click_log to the state of the right table (for subsequent association with the left table).
4.3. Inner join
Inner join click_log (inner join click_log)
-
First of all, if the condition in join XXX on is equality, it means that the join is carried out under the same key. The key of the join is show_log.log_id and click_log.log_id, and the data with the same key will be sent to a concurrent processing. If the condition in join XXX on is inequality, the source operators of the two streams will deliver data to the join operator according to the partition policy of global, and the confluence of the join operator will be set to 1. All data will be sent to this one concurrency for processing.
-
With the same key, when show_log receives data, if click_log has data: [+ (show_log, click_log)] is traversed with all the click_log data and show_log is saved to the state of the left table (for subsequent joins).
-
With the same key, show_log will not output data if click_log does not have data, and will save show_log to the state of the left table (for later join use).
-
With the same key, when click_log receives data, if show_log has data: The output [+ (show_log, click_log)] data is traversed with all the data in show_log, and the click_log is saved to the state of the right table (for subsequent joins).
-
With the same key, when click_log receives data, if show_log does not receive data: click_log does not output data and saves click_log to the state of the right table (for use in subsequent joins).
4.4. The right to join
Right join is the same as left join, but in reverse order.
4.5. Full join
Show_log (left table) full join click_log (right table)
-
First of all, if the condition in join XXX on is equality, it means that the join is carried out under the same key. The key of the join is show_log.log_id and click_log.log_id, and the data with the same key will be sent to a concurrent processing. If the condition in join XXX on is inequality, the source operators of the two streams will deliver data to the join operator according to the partition policy of global, and the confluence of the join operator will be set to 1. All data will be sent to this one concurrency for processing.
-
Show_log associates all click_log data with the same key if click_log has data. Before output, if the associated click_log has not been associated with show_log before (that is, [+ (null, click_log)]), a [- (null, click_log)] is sent and a [+ (show_log, Click_log)], revoke the intermediate click_log result that is not associated with the show_log data, deliver the latest result that is associated with the show_log data, and save the show_log to the state of the left table (for use in subsequent joins).
-
With the same key, when show_log receives data, if click_log does not receive data: [+ (show_log, null)] and save show_log to the state of the left table (for later joins).
-
With the same key, when click_log receives data, show_log associates all data in show_log with click_log. [- (show_log, null)] and [+ (show_log, null)] before output, if the associated show_log has not been associated with click_log before ([+ (show_log, null)]). Click_log)], retracts the intermediate result of show_log that is not associated with click_log data, delivers the latest result that is currently associated with click_log, and saves the click_log to the state of the right table (for use in subsequent joins).
-
With the same key, when click_log receives data, if show_log does not receive data: The click_log does not wait, but prints [+ (null, click_log)] data and saves the click_log to the state of the right table (for subsequent joins).
4.6. Summary of regular Join
In general, the above four types of Joins can be divided as follows.
-
Inner joins wait for each other until data is available.
-
Left JOIN, right Join, and full Join will not be equal to each other. As long as the data comes, they will try to associate. If the data can be associated, the fields delivered are all; if the data cannot be associated, the fields on the other side are null. After the subsequent data comes, when it is found that the data previously delivered is not associated with the data, it will do a rollback and deliver the associated result
4.7. How can retract cause data to be redistributed to Kafka?
Since flink SQL implements left JOIN, right Join, and full join in retract mode, it can’t meet business requirements in this way.
Let’s change our thinking. The feature of the above join is that it does not wait for each other. Is there a join that can wait for each other? For example, if the left join table cannot be associated with the right join table, you can wait for a period of time. If the left join table cannot be associated with the right join table within the period of time (show_log, NULL), and if the left join table is associated with the right join table, the left join table can wait for a period of time (show_log, click_log).
Interval Join makes its debut. How interval Join implements the above scenarios and its implementation principles will be introduced in detail in this section (below). Please look forward to it.
5. Summary and outlook
Source code public number background reply 1.13.2 SQL JOIN fantastic parsing road access.
This paper mainly introduces the problems of Flink SQL Regular when it meets the join scenario, and explains the operation principle by analyzing its implementation. It mainly includes the following two parts:
-
Introduction to background and application scenarios: Join is the most common scenario in offline data warehouse, and it is inevitable to be missing in real-time data warehouse. Flink SQL provides rich join modes (four kinds are summarized: Regular Join, dimension table Join, temporal Join and interval Join provide powerful backing for us to meet our needs
-
To introduce flink SQL Join solution, take an exposure log left JOIN click log as a case
-
Flink SQL Join solution Retract can be found in left join, right join, and full join, so you should fully understand the operation mechanism of regular join before using it. Avoid the problem of sending heavy or excessive data.
-
Regular join retract problems are mainly introduced in this article. Interval Join can be used to avoid retract problems and meet the actual case requirements in point 2.
This article uses the article synchronization assistant to synchronize