sequence

This article briefly describes the Join operation for Kafka Streams

join

A join operation merges two streams based on the keys of their data records, and yields a new stream. A join over record streams usually needs to be performed on a windowing basis because otherwise the number of records that must be maintained for performing the join may grow indefinitely.

The instance

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> left = builder.stream("intpu-left");
        KStream<String, String> right = builder.stream("intpu-right");

        KStream<String, String> all = left.selectKey((key, value) -> value.split(",")[1])
                .join(right.selectKey((key, value) -> value.split(",")[0]), new ValueJoiner<String, String, String>() {
            @Override
            public String apply(String value1, String value2) {
                return value1 + "--" + value2;
            }
        }, JoinWindows.of(30000));

        all.print();Copy the code

Since the join operation is based on the key, the key is normally mapped again

test

sh bin/kafka-topics.sh --create --topic intpu-left --replication-factor 1 --partitions 3 --zookeeper localhost:2181

sh bin/kafka-topics.sh --create --topic intpu-right --replication-factor 1 --partitions 3 --zookeeper localhost:2181


sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-left
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic intpu-rightCopy the code

On the left, type something like

1,a
2,b
3,c
3,c
4,d
1,a
2,b
3,c
1,a
2,b
3,c
4,e
5,h
6,f
7,gCopy the code

On the right, type something like

a,hello
b,world
c,hehehe
c,aaa
d,eee
a,cccc
b,aaaaaa
c,332435
a,dddd
b,2324
c,ddddd
e,23453
h,2222222
f,0o0o0o0
g,ssssCopy the code

Output instance

[KSTREAM-MERGE-0000000014]: a , 1,a--a,dddd [KSTREAM-MERGE-0000000014]: B, 2, b, b, 2324 the 2017-10-17 22:17:34. 578 INFO - [] StreamThread - 1 O.A.K.S.P.I nternals. StreamThread: stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-17 22:17:34. 578 INFO - [StreamThread - 1] O.A.K.S.P.I nternals. StreamThread: Research case of stream-thread [streamthread-1] StreamTask 0_0 2017-10-17 22:17:34.585 INFO -- [streamthread-1] Research case of task StreamTask 0_0 2017-10-17 22:17:34.585 INFO o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1Copy the code

Join the category

Outer join left JOIN Outer join left join If you want to record records that do not match in the time window, you can use outer Join to store additional records, and then filter again based on the matched records.

Output instance

[KSTREAM-MERGE-0000000014]: f , null--f,ddddddd
[KSTREAM-MERGE-0000000014]: f , 4,f--f, DDDDDDD 22:31:12 2017-10-17. 530 INFO - [] StreamThread - 1 O.A.K.S.P.I nternals. StreamThread: stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-17 22:31:12. 530 INFO - [StreamThread - 1] O.A.K.S.P.I nternals. StreamThread: Research case of stream-thread [streamthread-1] StreamTask 0_0 2017-10-17 22:31:12.531 INFO -- [streamthread-1] Research case of task StreamTask 0_0 2017-10-17 22:31:12.531 INFO o.a.k.s.p.internals.StreamThread : Research of stream-thread [streamthread-1] Research of task StreamTask 0_1 2017-10-17 22:31:12.531 INFO -- [streamthread-1] Research of task StreamTask 0_1 2017-10-17 22:31:12.531 INFO o.a.k.s.p.internals.StreamThread : Research case of stream-thread [streamthread-1] Research case of task StreamTask 1_0 2017-10-17 22:31:12.531 INFO -- [streamthread-1] Research case of task StreamTask (right) o.a.k.s.p.internals.StreamThread : Research of stream-thread [streamthread-1] Research of task StreamTask 0_2 2017-10-17 22:31:12.533 INFO -- [streamthread-1] Research of task StreamTask 0_2 2017-10-17 22:31:12.533 INFO o.a.k.s.p.internals.StreamThread : Research of stream-thread [streamthread-1] Research of task StreamTask 1_1 2017-10-17 22:31:12.533 INFO -- [streamthread-1] Research of task StreamTask 1_1 2017-10-17 22:31:12.533 INFO o.a.k.s.p.internals.StreamThread : Research of stream-thread [streamthread-1] Research of task StreamTask 2_0 2017-10-17 22:31:12.539 INFO -- [streamthread-1] Research of task StreamTask 2_0 (research of task) o.a.k.s.p.internals.StreamThread : Research of stream-thread [streamthread-1] Research of task StreamTask 1_2 2017-10-17 22:31:12.540 INFO -- [streamthread-1] Research of task StreamTask 1_2 2017-10-17 22:31:12.540 INFO o.a.k.s.p.internals.StreamThread : Research of stream-thread [streamthread-1] Research of task StreamTask 2_1 2017-10-17 22:31:12.541 INFO -- [streamthread-1] Research of task StreamTask 2_1 2017-10-17 22:31:12.541 INFO o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 2_2 [KSTREAM-MERGE-0000000014]: g , 5,g--null [KSTREAM-MERGE-0000000014]: h , 6,h--null [KSTREAM-MERGE-0000000014]: h , 6,h--h,dddddddCopy the code

summary

The Join operation of Kafka Streams is very suitable for real-time matching operation of different data sources.