Public account: Java big data and data warehouse, get data, learn big data technology.

The common application scenarios of TopN, the most popular products purchased, the most popular authors read, etc.

1. Use knowledge points

  • Flink creates kafka data sources;
  • How to specify Watermark based on EventTime processing;
  • Flink’s Window, tumbling Window and sliding Window.
  • The use of State;
  • ProcessFunction implements TopN function;

2. Case introduction

Based on the user access logs, the topN of the most active users in a recent period of time is calculated.

  • Create a kafka producer and send test data to Kafka.
  • Consume Kafka data, using the sliding window, updating the rankings at regular intervals;

3. The data source

Here we use the Kafka API to send test data to Kafka with the following code:

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User {

    private long id;
    private String username;
    private String password;
    private long timestamp;
}

Map<String, String> config = Configuration.initConfig("commons.xml");

@Test
public void sendData(a) throws InterruptedException {
    int cnt = 0;

    while (cnt < 200){
        User user = new User();
        user.setId(cnt);
        user.setUsername("username" + new Random().nextInt((cnt % 5) + 2));
        user.setPassword("password" + cnt);
        user.setTimestamp(System.currentTimeMillis());
        Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user));
        while(! future.isDone()){ Thread.sleep(100);
        }
        try {
            RecordMetadata recordMetadata = future.get();
            System.out.println(recordMetadata.offset());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("Send a message:" + cnt + "* * * * * *" + user.toString());
        cnt = cnt + 1; }}Copy the code

Here we use random numbers to scramble the username to make it easier to use different username sizes to make the result more obvious. KafkaUtil KafkaUtil is a kafka tool class written by myself. The code is very simple.

4. Main procedures

Create a main program and start writing the code.

Create flink environment and associate kafka data source.

Map<String, String> config = Configuration.initConfig("commons.xml");

Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper"));
kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
kafkaProps.setProperty("group.id", config.get("kafka-groupid"));

StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
Copy the code

EventTime and Watermark

senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Copy the code

Set properties senv. SetStreamTimeCharacteristic (TimeCharacteristic. EventTime), time field to deal with according to the data, the default is TimeCharacteristic ProcessingTime

/** The time characteristic that is used if none other is set. */
	private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
Copy the code

This property must be set, otherwise the end of the window may not trigger later, resulting in no output. There are three values:

  • ProcessingTime: Time at which the event is processed. That is, the system time of the Flink cluster machine is determined.
  • EventTime: Indicates the time when an event occurs. Generally, it is the time that the data itself carries.
  • IngestionTime: IngestionTime, the time at which data enters the flink stream, which is different from ProcessingTime;

Specified using the data of actual time to deal with, then you need to specify the flink program how to get to the data of time field, call the DataStream assignTimestampsAndWatermarks method used here, extraction time and setting up the watermark.

senv.addSource(
        new FlinkKafkaConsumer010<>(
                config.get("kafka-topic"),
                new SimpleStringSchema(),
                kafkaProps
        )
).map(x ->{
    return JSON.parseObject(x, User.class);
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) {
    @Override
    public long extractTimestamp(User element) {
        returnelement.getTimestamp(); }})Copy the code

As you can see from the previous code, the User object is converted to a JSON string when sent to Kafka. In this case, fastJSON is used, which can be converted to JsonObject. I’ll just convert it to the User object json.parseObject (x, user.class) for easy handling.

Here given data may order, use the abstract class can handle out-of-order BoundedOutOfOrdernessTimestampExtractor, and realized the method to realize the only one not extractTimestamp, out-of-order data, will lead to data latency, A time.milliseconds (1000) is passed in the constructor, indicating that the data can be delayed for a second. For example, if the window length is 10s, data from 0 10s will be calculated at 11s, and the watermark value is 10, the calculation will be triggered. In other words, the watermark will tolerate data from 0 10S, and the data will arrive at t+1 at the latest.

See this article for more on watermark

Blog.csdn.net/qq_39657909…

Window statistical

In terms of business requirements, it may be one hour or the data of the past 15 minutes, and the ranking will be updated every 5 minutes. Here, to demonstrate the effect, the window length is set to 10 seconds, and the ranking data of the past 10 seconds will be updated every 5 seconds by sliding for 5 seconds.

.keyBy("username")
.timeWindow(Time.seconds(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())
Copy the code

KeyBy (“username”) is used to group the users, and timeWindow(Time size, Time slide) is used for each user (10 seconds window, 5 seconds slide). Then we use. Aggregate (AggregateFunction AF, WindowFunction WF) for incremental aggregation operation, which can use AggregateFunction to aggregate data in advance and reduce the storage pressure of state. Compared to.apply(WindowFunction wf), which stores all the data in the window, the final calculation is much more efficient. The first parameter of the aggregate() method is used

AggregateFunction CountAgg implements the AggregateFunction interface, which counts the number of entries in the window.

public class CountAgg implements AggregateFunction<User.Long.Long>{
    @Override
    public Long createAccumulator(a) {
        return 0L;
    }

    @Override
    public Long add(User value, Long accumulator) {
        return accumulator + 1;
    }

    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }

    @Override
    public Long merge(Long a, Long b) {
        returna + b; }}Copy the code

. Aggregate (AggregateFunction af, WindowFunction wF) The second parameter WindowFunction outputs the aggregated results of each key and each window with other information. The WindowResultFunction we’ve implemented here encapsulates the user name, window, and page views as UserViewCount.

private static class WindowResultFunction implements WindowFunction<Long.UserViewCount.Tuple.TimeWindow> {


    @Override
    public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception {
        Long count = input.iterator().next();
        out.collect(newUserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count)); }}@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public static class UserViewCount {
    private String userName;
    private long windowEnd;
    private long viewCount;

}
Copy the code

TopN counts the most active users

To count the active users under each window, we need to group them by window again, here keyBy() based on windowEnd in UserViewCount. You then use ProcessFunction to implement a custom TopN function, TopNHotItems, to calculate the top 3 clickers and format the ranking results into strings for subsequent output.

.keyBy("windowEnd")
.process(new TopNHotUsers(3))
.print();
Copy the code

ProcessFunction is a low-level API provided by Flink for more advanced functionality. It mainly provides timer function (support EventTime or ProcessingTime). In this case, we will use timer to determine when the access data of all users under a window is collected. Because the Watermark progress is global, in the processElement method, we register a windowEnd+1 timer every time we receive a piece of data (ItemViewCount) (the Flink framework automatically ignores double registrations at the same time). When the windowEnd+1 timer is triggered, it means that the Watermark of windowEnd+1 is received, that is, all user window statistics under that windowEnd are collected. In onTimer(), we sort all the collected items and clicks, select TopN, format the ranking information into a string and output it.

Here we also use ListState

to store each UserViewCount message received to ensure that state data is not lost and consistent in the event of a failure. ListState is a State API similar to Java List interface provided by Flink, which integrates the framework’s checkpoint mechanism and automatically achieves the exact-once semantic guarantee.

private static class TopNHotUsers extends KeyedProcessFunction<Tuple.UserViewCount.String> {

    private int topSize;
    private ListState<UserViewCount> userViewCountListState;

    public TopNHotUsers(int topSize) {
        this.topSize = topSize;
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        List<UserViewCount> userViewCounts = new ArrayList<>();
        for(UserViewCount userViewCount : userViewCountListState.get()) {
            userViewCounts.add(userViewCount);
        }

        userViewCountListState.clear();

        userViewCounts.sort(new Comparator<UserViewCount>() {
            @Override
            public int compare(UserViewCount o1, UserViewCount o2) {
                return (int)(o2.viewCount - o1.viewCount); }});// Format the ranking information into a String for easy printing
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("Time:").append(new Timestamp(timestamp-1)).append("\n");
        for (int i = 0; i < topSize; i++) {
            UserViewCount currentItem = userViewCounts.get(i);
            // No1: merchandise ID=12224 views =2413
            result.append("No").append(i).append(":")
                    .append("Username =").append(currentItem.userName)
                    .append("Page Views =").append(currentItem.viewCount)
                    .append("\n");
        }
        result.append("====================================\n\n");

        Thread.sleep(1000);

        out.collect(result.toString());

    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>(
                "user-state",
                UserViewCount.class
        );
        userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor);

    }

    @Override
    public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception {
        userViewCountListState.add(value);
        ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000); }}Copy the code

Results output

As you can see, the output data is updated every 5 seconds.

Reference wuchong. Me/blog / 2018/1…

Public account: Java big data and data warehouse, get data, learn big data technology.