Tips | Flink using union join, cogroup instead

Each article in this series is short and updated from time to time. It starts from some practical cases to improve the posture and motivation of partners. This article introduces how to use Union instead of Cogroup (or Join) in Flink to simplify the task logic and improve the task performance under the scenario of meeting the original requirements and realizing the original logic. The reading duration is about one minute.

Requirement Scenario Analysis

Demand scenarios

Demand inducements come… The data product sister wants to collect the five real-time indicators of “like, play, comment, share and report” of a single short video, and summarize them into the real-time video consumption wide table of photo_id and 1-minute time granularity (i.e., the wide table field is at least: “Photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp”) to the real-time large screen.

The problem is that the triggering mechanism and reporting time of the five types of video consumption behaviors are different for the same video, which determines that the five types of behavior logs correspond to five different data sources for real-time processing. SQL boy naturally thought of the join operation to merge the five types of consumption behavior log, but the real-time join(cogroup) is really perfect.

Source input and features

Firstly, we analyze the source characteristics of the requirements:

  • Photo_id Granularity Play (play), like (like), comment (comment), share (share), negative (report) detail data, “Users play (like, comment…) n times, The client server will upload n playback (likes, comments…) logs to the data source.
  • The source schema of the five types of video consumption behavior logs is “photo_id + TIMESTAMP + other dimensions”.

Sink output and features

Sink features are as follows:

  • Photo_id Granularity Play, like, comment, share, negative
  • Real-time video consumption sink Schema: “photo_id + play_cnt + like_CNt + comment_CNt + share_CNt + negative_cnt + minute_timestamp”

Source and Sink example data

The source of data:

photo_id timestamp user_id instructions
1 2020/10/3 11:30:33 3 play
1 2020/10/3 11:30:33 4 play
1 2020/10/3 11:30:33 5 play
1 2020/10/3 11:30:33 4 give a like
2 2020/10/3 11:30:33 5 give a like
1 2020/10/3 11:30:33 5 comments

Sink data:

photo_id timestamp play_cnt like_cnt comment_cnt
1 2020/10/3 11:30:00 3 1 1
2 2020/10/3 11:30:00 0 1 0

Now that we have a complete analysis of the input and output of the data source, let’s see what solutions are available to meet these requirements.

Implementation scheme

  • Scheme 1: “Cogroup scheme in this section” directly consumes the original log data. Cogroup or Join is used to perform window aggregation calculation for five different types of video consumption behavior logs
  • Scheme 2: Calculate the minute granularity index data separately for five different types of video consumption behavior logs, and then merge the aggregated index data according to photo_id downstream
  • Scheme 3: “Union scheme in this section” Since the data source schema is the same, union operation is directly performed on five different types of video consumption behavior logs, and aggregation calculation of five types of indicators is performed in subsequent window functions. The design process of Union scheme is introduced later

Let’s start with sample code for the Cogroup scenario.

cogroup

The cogroup implementation example is as follows. The sample code uses the processing time directly (or can be replaced with the event time ~), thus simplifying the timestamp of the data source:

public class Cogroup {



    public static void main(String[] args) throws Exception {



        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        // Long -> photo_id

        DataStream<Long> play = SourceFactory.getDataStream(xxx);

        // Long -> photo_id

        DataStream<Long> like = SourceFactory.getDataStream(xxx);

        // Long -> photo_id

        DataStream<Long> comment = SourceFactory.getDataStream(xxx);

        // Long -> photo_id

        DataStream<Long> share = SourceFactory.getDataStream(xxx);

        // Long -> photo_id

        DataStream<Long> negative = SourceFactory.getDataStream(xxx);



        // Tuple3 -> photo_id + play_cnt + like_cnt ,>

        DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play

            .coGroup(like)

            .where(KeySelectorFactory.get(Function.identity()))

            .equalTo(KeySelectorFactory.get(Function.identity()))

            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))

            .apply(xxx1);



        // Tuple4 -> photo_id + play_cnt + like_cnt + comment_cnt ,>

        DataStream<Tuple4<Long, Long, Long, Long, Long>> playAndLikeAndComment = playAndLikeCnt

            .coGroup(comment)

            .where(KeySelectorFactory.get(playAndLikeModel -> playAndLikeModel.f0))

            .equalTo(KeySelectorFactory.get(Function.identity()))

            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))

            .apply(xxx2);



        // Tuple5 -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt ,>

        DataStream<Tuple5<Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = playAndLikeAndComment

            .coGroup(share)

            .where(KeySelectorFactory.get(playAndLikeAndCommentModel -> playAndLikeAndCommentModel.f0))

            .equalTo(KeySelectorFactory.get(Function.identity()))

            .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))

            .apply(xxx2);



        // Tuple7 -> photo_id + play_cnt + like_cnt + comment_cnt + share_cnt + negative_cnt + minute_timestamp Play, like, comment, share, report data merge ,>

        / / same as above

        DataStream<Tuple7<Long, Long, Long, Long, Long, Long, Long>> playAndLikeAndCommentAndShare = ***;



        env.execute();

    }

}

Copy the code

It’s not that simple. Let’s do a more detailed analysis.

There may be problems with the above implementation

  • From flink consumption to a data source of Play to the final output of the aggregated data, the data delay in the whole process is > 3 minutes…
  • If data sources continue to increase (for example, other video consumption operation data sources are added), the whole task operator will become more and more, the data link will be longer, the task stability will become worse, and the output data delay will also become more and longer with the window calculation

“Data product sister” : 🤩, little brother good, since the problem points are analyzed out, technology little brother to help people solve it ~

“Header ∩ technology little brother” : do.

Since too many Windows may lead to data output delay and job instability, is there any way to reduce the number of Windows? Based on the fact that only one window operator is involved in the whole job, the following data links can be obtained.

The reverse link

1-5 are the whole link of reverse push.

  • 1. Data of five indicators are calculated in a single window
  • 2. The window models of the five indicators are the same
  • 3. Same key in keyby (photo_id)
  • 4. The data sources of the five indicators are all photo_id granularity, and the models of the five data sources must be the same and can be combined
  • 5. Union operator can merge five kinds of data sources!!

Without further ado, go to the Union scheme code.

union

public class Union {



    public static void main(String[] args) throws Exception {



        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();



        // Tuple2 -> photo_id + "PLAY" tag ,>

        DataStream<Tuple2<Long, String>> play = SourceFactory.getDataStream(xxx);

        // Tuple2 -> photo_id + "LIKE" tag ,>

        DataStream<Tuple2<Long, String>> like = SourceFactory.getDataStream(xxx);

        // Tuple2 -> photo_id + "COMMENT" tag ,>

        DataStream<Tuple2<Long, String>> comment = SourceFactory.getDataStream(xxx);

        // Tuple2 -> photo_id + "SHARE" tag ,>

        DataStream<Tuple2<Long, String>> share = SourceFactory.getDataStream(xxx);

        // Tuple2 -> photo_id + "NEGATIVE" tag ,>

        DataStream<Tuple2<Long, String>> negative = SourceFactory.getDataStream(xxx);



        // Tuple5<Long, Long, Long, Long> -> photo_id + play_cnt + like_cnt + comment_cnt + window_start_timestamp

        DataStream<Tuple3<Long, Long, Long>> playAndLikeCnt = play

            .union(like)

            .union(comment)

            .union(share)

            .union(negative)

            .keyBy(KeySelectorFactory.get(i -> i.f0))

            .timeWindow(Time.seconds(60))

            .process(xxx);



        env.execute();

    }

}

Copy the code

It can be found that no matter how the upstream data source changes, only one window operator can be maintained to process and calculate data in the union scheme mentioned above, so the problems of data delay and too many Flink task operators can be solved.

In cases where the data source schemas are the same (or different but can be formatted into the same format after processing), or where the processing logic is the same, you can use union for logical simplification.

conclusion

In the second part, we analyzed how to solve this requirement scenario by using cogroup (case code), and then analyzed some problems that might exist in this implementation scheme, and introduced the inverse and design ideas of union solution. In part 3, this scenario is optimized to some extent by using union instead of cogroup. If for this scene, the big guys have a better optimization plan, look forward to message oh.

You can get the learning materials and videos of Flink by replying to the keyword of Flink in the official account mangodata.