The technology principle

  • Data shuffle: in the dual-stream join scenario, Flink creates partitions based ON the join key in ON to ensure that data with the same key in both streams are processed ON the same node

  • Data saving: Due to the different flow rates on the left and right sides, leftState and rightState will be opened up in the join process to save data.

    • LeftEvent comes stored in LState, RightEvent comes stored in RState;
    • LeftEvent will JOIN RightState and send all events downstream.
    • RightEvent will go to LeftState for joins, and issue all events after joins downstream.
  • Inner Join: Events on both sides of a dual-stream join are stored in State, and will not be output until the join conditions are met.

  • Left outer join = left outer join = left outer join = left outer join = left outer join = left outer join = left outer join = left outer join = left outer join When there is no event that can be joined in the right stream, the event information on the right is supplemented with NULL. When there is an event that can be joined in the right stream, the NULL event is withdrawn and the event with complete join (with the event column on the right) is sent downstream.

  • Right outer join, same thing

species

UnBounded JOIN

1. Regular join

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
Copy the code

Features:

  • Unbounded JOIN

  • The data flowing on both sides is persisted to state, which continues to grow in size over time

  • The result of the JOIN is a Retract Stream (i.e. the computed result is insert/delete), and the data from one Stream is associated with all previous and future data from the other Stream.

Bounded JOIN

1. Window Join

Windowtime-oriented Join for KeyedStream, join elements with the same key and in the same time window.

Different window joins are implemented according to the window type (fixed/sliding/session). The window time range is closed on the left and opened on the right, such as [5, 10].

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)
Copy the code

Join logic is implemented through JoinFunction/FlatJoinFunction.

EventTimeWindow Join WaterMark is defined

In the EventTimeWindow of the dual-stream Join, both streams specify watermark. In this case, the slower watermark of the join window prevails.

2. Interval Join

Eventtime-oriented join for KeyedStream, join elements that have the same key and whose event times are between lowerBoundTime and upperBoundTime.

The default value is the closing time interval, that is

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

You can also change the closing range of the time interval by using.lowerboundexclusive () /.upperboundexclusive.

Since interval Join only supports eventTime, we use watermark to drive the progress of the event stream.

The definition of the Watermark

currentWatermark = Min(greenElem.ts, orangeElem.ts) – upperBound

The elements whose event time is less than the current watermark are regarded as expired data and will be cleared.

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
importorg.apache.flink.streaming.api.windowing.time.Time; . DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream .keyBy(<KeySelector>) .intervalJoin(greenStream.keyBy(<KeySelector>))// Define the time interval
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){
        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + ","+ second); }});Copy the code

SQL:

SELECT *
FROM
  Orders o,
  Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
Copy the code

JOIN LATERAL

The above are dual-stream driven join, join LATERAL is single-stream driven join, according to the left table one by one data dynamic join and the right table

LATERAL and CROSS APPLY have the same semantics

Example:

# CROSS APPLAY
SELECT 
	c.customerid, c.city, o.orderid 
FROM Customers c, CROSS APPLAY( 
  SELECT 
  	o.orderid, o.customerid 
  FROM Orders o 
  WHERE o.customerid = c.customerid 
) as o 
# LATERAL
SELECT 
	e.NAME, e.DEPTNO, d.NAME 
FROM EMPS e, LATERAL ( 
  SELECT  * 
  FORM DEPTS d 
  WHERE e.DEPTNO=d.DEPTNO 
) as d; 
Copy the code
# inner join SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; SELECT users, tag FROM Orders left join LATERAL TABLE(UNneST_UDTF) t AS tag on TRUECopy the code

Features:

  • A. Not a physical Table, but a VIEW or table-valued Funciton. Join a function
  • Singleton drive, to mainstream (left flow) drive
  • The subquery in the FROM Clause cannot refer to the table on the left.

In the Flinkjoin lateralUse scenarios of

  • UDTF(TVF) – User-defined Table Funciton
  • Temporal Table

JOIN Temporal Table

Temporal Tables: Views of table contents at a specific point in time in history, stored in the form of maps of key= update time, value= version snapshots.

Processing-time Temporal Joins

Save only the current version of the temporal table. JOIN behavior always occurs on the base table based on the current version of the system time. It is not possible to correlate on the base table of the historical time version. Updates to the underlying tables do not affect previously emitted correlation results.

Event-time Temporal Joins are processed

Saves the current watermark to the temporal table of all versions of the current system time. JOIN behavior occurs on the base table of the latest version at the specified event time. That is, you can associate the base table of the historical time version according to the event time.

For example, if the event time is set to 12:00 and the current system time is already 13:00, the join will associate only the base table of the latest version as of 12:00.

Demo

  1. Based on the primary key/time attribute field, create and register the temporal table function

    Call createTemporalTableFunction method, the method are defined as follows:

    def createTemporalTableFunction( timeAttribute: String, primaryKey: String): TemporalTableFunction = { createTemporalTableFunction( ExpressionParser.parseExpression(timeAttribute), ExpressionParser.parseExpression(primaryKey)) }

  2. Use the Temporal Table to join in SQL

Temporal table function is a special type of table function. The parameter of evEL function is the time field

Example:

importorg.apache.flink.table.functions.TemporalTableFunction; (...).// Get the stream and table environments.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Provide a static data set of the rates history table.
List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
ratesHistoryData.add(Tuple2.of("US Dollar".102L));
ratesHistoryData.add(Tuple2.of("Euro".114L));
ratesHistoryData.add(Tuple2.of("Yen".1L));
ratesHistoryData.add(Tuple2.of("Euro".116L));
ratesHistoryData.add(Tuple2.of("Euro".119L));

// Create and register an example table using above data set.
// In the real setup, you should replace this with your own table.
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");

tEnv.registerTable("RatesHistory", ratesHistory);

// Create and register a temporal table function.
// Define "r_proctime" as the time attribute and "r_currency" as the primary key.
/ / create TemporalTableFunction
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime"."r_currency"); 
// register TemporalTableFunction in the environment
tEnv.registerFunction("Rates", rates);  
Copy the code

Use:

Rates() is an already registered temporalTableFunction

SELECT o.currency, o.amount, r.rate
  o.amount * r.rate AS yen_amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency
Copy the code

application

Dimension table JOIN

Static dimension table JOIN

Asynchronous IO query + cache; Select a dimension table cache strategy based on the dimension table size: full cache/LRU cache.

Dynamic dimension table JOIN

Listen for changes in the dimension table and dynamically update the dimension table

Whether version snapshot is required: If version snapshot is required, you need to set the event time attribute characteristics and use temporalTable to implement join

To compare

  • Dual-stream JOIN: dual-stream driven. Join calculation can be performed on each data inflow on both sides.
  • LATERAL JOIN: STREAM JOIN Table Function (JOIN of single stream and UDTF). Single-stream driver, dynamically generate table view according to stream data. There is no state management function
  • Tamporal Table JOIN: sream JOIN Temporal Table (JOIN of single flow and version Table). Based on LATERAL JOIN, single-stream driver. Historical version status management function

To optimize the

  • Construct PK Source: PK cannot be defined on the Source Connector. When joining, redundant join data of historical data will be generated

    • Solution: When join, only the last row of data in the same PK of the event stream is taken. Blink Supports the LAST_VALUE statement. See Apache Flink Ramble Series (07) – Continuous Queries
  • Hot spots caused by NULL: When left outer join C, A left join B will generate A large number of (A, NULL), and cCol in B is NULL. Therefore, a large number of data with NULL cCol will appear on the same node, causing a hotspot.

    • Solution: JOIN ReOrder to change the order of JOIN, reverse JOIN

    reference

Continuous Queries

Apache Flink Ramble Series (09) – JOIN operator

Apache Flink Rambling Series (10) – JOIN LATERAL

Apache Flink Ramble Series (11) – Temporal Table JOIN

Apache Flink Ramble Series (12) – Time Interval(time-windowed) JOIN

Flink 1.7 New features: Temporal Tables and MATCH_RECOGNIZE

Flink SQL function decryption series — Dimension table JOIN and asynchronous optimization