Inner join is equivalent to the global window, the previous message is also saved, a message can be associated, output the cartesian product of the new message!

package SQL; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @author you guess * @date 2021/1/6 15:22 * @version 1.0 * @desc */ public class DataStreamSql1_Join {private static final Logger LOG = LoggerFactory.getLogger(MinMinByMaxMaxBy.MinMinByMaxMaxByTest.class); Private static final String[] TYPE = {"a ", "b ", "c ", "d ", "e "}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, envSettings); DataStreamSource<Order> orderSourceA = env.addSource(new SourceFunction<Order>() { private volatile boolean isRunning = true; private final Random random = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10))); System.out.println(new Date() + ",orderSourceA submit element :" + order); ctx.collect(order); } } @Override public void cancel() { isRunning = false; } }, "order-infoA"); DataStreamSource<Order> orderSourceB = env.addSource(new SourceFunction<Order>() { private volatile boolean isRunning = true; private final Random random = new Random(); @Override public void run(SourceContext<Order> ctx) throws Exception { while (isRunning) { TimeUnit.SECONDS.sleep(1); Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10))); System.out.println(new Date() + ",orderSourceB submit element :" + Order); ctx.collect(order); } } @Override public void cancel() { isRunning = false; } }, "order-infoB"); stEnv.registerDataStream("tableA", orderSourceA); stEnv.registerDataStream("tableB", orderSourceB); Table result = stEnv.sqlQuery("SELECT A.name,A.qtty,B.qtty from tableA A inner join tableB B on A.name = B.name"); stEnv.toRetractStream(result, Row.class).print(); Env.execute ("Flink Streaming Java API Skeleton"); } /** * Simple POJO. */ public static class Order { public String name; public Long qtty; public Order() { } public Order(String name, Long qtty) { this.name = name; this.qtty = qtty; } @Override public String toString() { return "Order{" + "name='" + name + '\'' + ", qtty=" + qtty + '}'; }}}Copy the code

Wed Jan 06 15:28:04 CST 2021,orderSourceA commit element :Order{name=’d grape ‘, qtty=9}

Wed Jan 06 15:28:04 CST 2021,orderSourceB commit element :Order{name=’b pear ‘, qtty=8}

Wed Jan 06 15:28:05 CST 2021,orderSourceA commit element :Order{name=’b pear ‘, qtty=4}

Wed Jan 06 15:28:05 CST 2021,orderSourceB commit element :Order{name=’b pear ‘, qtty=6}

5 > (true, b pear, 4, 8)

5 > (true, b pear, 4, 6)

Wed Jan 06 15:28:06 CST 2021,orderSourceA commit element :Order{name=’a apple ‘, qtty=3}

Wed Jan 06 15:28:06 CST 2021,orderSourceB commit element :Order{name=’e ‘, qtty=1}

Wed Jan 06 15:28:07 CST 2021,orderSourceA commit element :Order{name=’e ‘, qtty=4}

Wed Jan 06 15:28:07 CST 2021,orderSourceB commit element :Order{name=’a apple ‘, qtty=6}

9> (true,a apple,3,6)

4> (true,e dragonfruit,4,1)

Wed Jan 06 15:28:08 CST 2021,orderSourceB commit element :Order{name=’e ‘, qtty=9}

Wed Jan 06 15:28:08 CST 2021,orderSourceA commit element :Order{name=’e dragon fruit ‘, qtty=6}

4> (true,e dragon fruit,4,9)

4> (true,e dragonfruit,6,1)

4> (true,e dragonfruit,6,9)

Wed Jan 06 15:28:09 CST 2021,orderSourceA commit element :Order{name=’e ‘, qtty=1}

Wed Jan 06 15:28:09 CST 2021,orderSourceB commit element :Order{name=’e ‘, qtty=2}

4> (true,e dragonfruit,4,2)

4> (true,e dragonfruit,6,2)

4> (true,e dragonfruit,1,1)

4> (true,e dragon fruit,1,2)

4> (true,e dragon fruit,1,9)

Wed Jan 06 15:28:10 CST 2021,orderSourceA commit element :Order{name=’c watermelon ‘, qtty=5}

Wed Jan 06 15:28:10 CST 2021,orderSourceB commit element :Order{name=’e ‘, qtty=6}

4> (true,e dragonfruit,4,6)

4> (true,e dragonfruit,6,6)

4> (true,e dragonfruit,1,6)

Wed Jan 06 15:28:11 CST 2021,orderSourceA commit element :Order{name=’c watermelon ‘, qtty=6}

Wed Jan 06 15:28:11 CST 2021,orderSourceB commit element :Order{name=’c watermelon ‘, qtty=0}

8> (true,c watermelon,5,0)

8> (true,c watermelon,6,0)

Wed Jan 06 15:28:12 CST 2021,orderSourceA commit element :Order{name=’b pear ‘, qtty=2}

Wed Jan 06 15:28:12 CST 2021,orderSourceB commits element :Order{name=’d grape ‘, qtty=7} Qtty =9} qtty=9}

3> (true,d grape,9,7)

5 > (true, pear, b 2, 8)

5 > (true, pear, b 2, 6)

Wed Jan 06 15:28:13 CST 2021,orderSourceB commit element :Order{name=’d grape ‘, qtty=1}

Wed Jan 06 15:28:13 CST 2021,orderSourceA commits elements :Order{name=’d grape ‘, qtty=5}

3> (true,d grape,9,1)

3> (true,d grape,5,1)

3> (true,d grape,5,7)

Wed Jan 06 15:28:14 CST 2021,orderSourceB commit element :Order{name=’d grape ‘, qtty=5}

Wed Jan 06 15:28:14 CST 2021,orderSourceA commit element :Order{name=’e dragon fruit ‘, qtty=9}

3> (true,d grape,5,5)

3> (true,d grape,9,5)

4> (true,e dragonfruit,9,6)

4> (true,e dragonfruit,9,1)

4> (true,e dragonfruit,9,2)

4> (true,e dragonfruit,9,9)

Wed Jan 06 15:28:15 CST 2021,orderSourceB commits elements :Order{name=’e dragon fruit ‘, qtty=5

Wed Jan 06 15:28:15 CST 2021,orderSourceA commits elements :Order{name=’e dragon fruit ‘, qtty=4

4> (true,e dragonfruit,4,5)

4> (true,e dragonfruit,6,5)

4> (true,e dragon fruit,1,5)

4> (true,e dragonfruit,9,5)

4> (true,e dragonfruit,4,5)

4> (true,e dragonfruit,4,6)

4> (true,e dragonfruit,4,1)

4> (true,e dragonfruit,4,2)

4> (true,e dragon fruit,4,9)

The syntax varies considerably between flink versions:

< flink version > 1.9.2 < / flink version >Copy the code

jdk1.8