In both OLAP and OLTP fields, Join is a SQL statement frequently involved in business and with complex optimization rules. For offline computing, after years of accumulation in the field of database, Join semantics and implementation have been very mature, but for Streaming SQL, which has just emerged in recent years, Join is in its infancy.

The most critical problem is that the implementation of Join depends on caching the whole data set, while the object of Streaming SQL Join is infinite data stream, so memory pressure and computing efficiency are inevitable problems in long-term operation. How Flink SQL solves these problems and joins the two data flows will be analyzed in the following sections.

Offline Batch SQL Join implementation

Traditional offline Batch SQL (SQL for bounded datasets) has three basic implementations, namely nested-loop Join, Sort-Merge Join, and Hash Join.

  • Nested-loop Join is the simplest and most direct method, in which two datasets are loaded into memory and the elements in the two datasets are compared one by one to see if they meet the Join conditions. Nested-loop Join is the least efficient in both time and space, but it is flexible and widely applicable, so its variant BNL is often used as the default base option for joins in traditional databases.
  • As the name implies, sort-merge Join is divided into two Sort and Merge phases. Firstly, the two data sets are sorted separately, and then the two ordered data sets are traversed and matched respectively, which is similar to merging sort. It is worth noting that sort-Merge only works with equi-Join (Join conditions all use equal as the comparison operator). Sort-merge Join requires the sorting of two data sets, which is expensive and is usually used as an optimization solution when the input is already an ordered data set.
  • Hash Join is also divided into two phases, first converting a data set to a Hash Table, and then iterating through the other data set elements to match elements in the Hash Table. The first stage and the first data set are called build stage and Build table respectively, while the second stage and second data set are called Probe stage and Probe table respectively. Hash Join is efficient but requires large space. It is usually an optimization scheme when one of the Join tables is a small table suitable for memory. Similar to sort-merge Join, Hash Join applies only to equi-Join.

Streaming SQL Join

Compared with offline Join, real-time Streaming SQL (SQL for unbounded data sets) cannot cache all data, so sorting data sets required by sort-Merge Join is basically impossible. However, nested-loop Join and Hash Join can meet the requirements of real-time SQL after some improvements. We look at the basic implementation of Nested Join in real-time Streaming SQL by example (case and figure from Piotr Nowojski in Flink Forward San Francisco share [2]).





Figure 1. The Join – in – continuous query – 1

Table A has 2 elements 1 and 42, and Table B has 1 element 42, so the Join result will output 42.





Figure 2. The Join – in – continuous query – 2

Then Table B receives three new elements, 7, 3, and 1. Because 1 matches the element in Table A, the result Table prints another element 1.





Figure 3. Join – in – continuous query – 3

Then new inputs 2, 3, 6 appear in Table A and 3 match elements in Table B, so output 3 to the result Table.

It can be seen that in nested-loop Join, we need to save the contents of the two input tables. As time goes by, the historical data of Table A and Table B will increase endlessly, resulting in unreasonable memory disk resource occupation and lower matching efficiency of individual elements. Similar problems exist in Hash Joins.

Is it possible to set up a cache culling strategy to clean up unnecessary historical data in a timely manner? The answer is yes, the key is how the cache culling strategy is implemented, which is the main difference between the three types of Joins provided by Flink SQL.

Flink SQL Join

  • Regular Join
A Regular Join is the most basic Join without a cache culling policy. Regular Join inputs and updates to both tables are visible globally, affecting all subsequent Join results. For example, in the following Join query, the new record in the Orders table matches all historical and future records in the Product table.

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.idCopy the code
Because historical data is not cleaned up, Regular joins allow you to perform any kind of update operation (INSERT, UPDATE, DELETE) on the input table. However, Regular joins are usually not sustainable due to resource problems and are usually used only to Join bounded data streams.

  • Time-Windowed Join
Windowed Joins use Windows to set a Join Time limit for two input tables. Data outside the Time limit is not visible to the Join and can be cleaned up. It is worth noting that one of the issues involved here is the semantics of Time, which can refer either to the system Time at which the computation occurs (i.e., Processing Time) or to the Event Time extracted from the Time field of the data itself. For Processing Time, Flink automatically divides the Join Time window according to the system Time and periodically cleans data. If it is Event Time, Flink allocates the Event Time window and cleans up the data according to Watermark.

Taking the more commonly used Event Time Windowed Join as an example, a query that joins the Orders table and Shipments single table according to the order Time and shipping Time is as follows:

SELECT *
FROM 
  Orders o, 
  Shipments s
WHERE 
  o.id = s.orderId AND
  s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOURCopy the code
This query sets the lower bound of O.o.dertime > s.shiptime-interval ‘4’ HOUR for the Orders table (Figure 4).





Figure 4. The timebound – Orders table for time-WINDOwed Join

The lower bound of S.shiptime >= O.roller time is set for Shipmenets (Figure 5).





Figure 5. Time-Windowed Join Time lower bound – Shipment table

Therefore, both input tables only need to cache data above the lower bound of time, keeping space usage within a reasonable range.

Although there is no problem with the underlying implementation, it is still difficult to define the time through SQL syntax. Although the concepts of Event Time, Processing Time and Watermark have become a common understanding in the field of real-time computing, the support for Time data type is still weak in the SQL field [4]. Therefore, defining Watermark and time semantics requires programming apis, such as converting from DataStream to Table, rather than SQL alone. This support for the Flink community project is accomplished by extending the SQL dialect, and interested readers can track progress through flip-66 [7].

  • Temporal Table Join
While Timed Windowed Joins solve the resource problem, they also limit usage scenarios: Both input streams of a Join must have a time lower bound beyond which they cannot be accessed. This is not true for many businesses that Join dimension tables because in many cases dimension tables have no time bounds. To solve this problem, Flink provides Temporal Table Join to meet user needs.

Temporal Table Join is similar to Hash Join, and inputs are divided into Build Table and Probe Table. The former is generally a Changelog of the latitude table, and the latter is generally a service data flow. In typical cases, the data volume of the latter should be much larger than the former. In Temporal Table Join, Build Table is a view with time version based on append-only data flow, so it is also called Temporal Table. Temporal tables require defining a primary key and a field for versioning (usually the Event Time field) to reflect the content recorded at different times.

A typical example is the conversion of commercial order amounts. Suppose there is an Orders flow that records the order amount and needs to Join with the RatesHistory exchange rate flow. RatesHistory represents the conversion rate of different currencies into yen, and an updated record is kept whenever there is a change in the exchange rate. The node contents of the two tables at a certain time are as follows:





Figure 6. Temporal Table Join Example]

We registered RatesHistory as a Temporal Table named Rates, setting the primary key to currency and the version field to time.





FIG. 7. Temporal Table Registration]

The Rates are then assigned a time version, and the Rates calculate the conversion content that matches the time version based on the RatesHistory.





Figure 8. Temporal Table Content]

With the help of Rates, we can express the business logic in the following query:

SELECT 
  o.amount * r.rate
FROM
  Orders o,
  LATERAL Table(Rates(o.time)) r
WHERE
  o.currency = r.currencyCopy the code
It is worth noting that, unlike Regular Join and time-WINDOwed Join where two tables are equal, new records of any Table can be matched with historical records of the other Table, in Temporal Table Join, Temoparal Table updates are not visible to the records of another Table prior to this time node. This means we only need to save Build Side records until Watermark exceeds the version field of the record. Because Probe Side’s input theoretically does not have records that predate Watermark, these versions of data can be safely cleaned up.

conclusion

The biggest difference between Join in real-time Streaming SQL and Join in offline Batch SQL lies in that the complete data set cannot be cached, but time-based cleaning conditions should be set to cache to limit the data range involved in Join. According to different cleaning strategies, Flink SQL provides Regular Join, time-WINDOwed Join and Temporal Table Join to deal with different business scenarios.

In addition, although Join can be flexibly implemented by low-level programming API in the field of real-time computing, the development of Join in Streaming SQL is still in a relatively preliminary stage. The key point is how to integrate time attribute into SQL appropriately. The SQL standard developed by the ISO SQL Committee does not provide a complete answer to this question. Or to put it another way, as one of the earliest pioneers of Streaming SQL, the Flink community is in a good position to explore a reasonable SQL syntax to contribute to ISO.







The original link

This article is the content of Ali Cloud, shall not be reproduced without permission.