Author: Chong Wu
In our last introductory tutorial, we were able to quickly build a basic Apache Flink program. This article takes you step-by-step through a more sophisticated Flink application: Real-time Hot Goods. Before starting this article, we recommend that you practice the previous article, because this article will use the my-Flink-project framework mentioned above.
In this article you will learn:
-
How do I do it based on EventTime, and how do I specify Watermark
-
How to use Flink’s flexible Window API
-
When and how does State need to be used
-
How do I use ProcessFunction to implement TopN functionality
Practical case introduction
This example will implement a requirement for “real-time hot items”, which we can translate into a requirement better understood by programmers: output the top N items that have been clicked most in the last hour every 5 minutes. To break this requirement down, we need to do a few things:
-
Extract the business timestamp and tell the Flink framework to make Windows based on the business time
-
Filter out click behavior data
-
Perform Sliding Window aggregation every 5 minutes according to the size of the Window for one hour.
-
Click on each window aggregate to output the top N click items in each window
Data preparation
Here we have prepared a data set of Taobao user behavior (from Ali Yuntianchi public data set, special thanks). This data set contains all the behaviors (including click, purchase, add purchase and favorites) of one million random users on Taobao on a given day. The data set is organized in a similar way to Movielens-20M, that is, each row of the data set represents a user behavior, consisting of user ID, item ID, item category ID, behavior type, and timestamp, separated by commas. A detailed description of each column in the dataset is as follows:
Column name | instructions |
---|---|
The user ID | The value is an integer and is an encrypted user ID |
Product ID | The encrypted commodity ID is an integer |
Item category ID | The value is an integer. It is the ID of the category to which the encrypted commodity belongs |
Types of behaviour | String, enumerated type, including (‘pv’, ‘buy’, ‘cart’, ‘fav’) |
The time stamp | The timestamp, in seconds, at which the behavior occurred |
You can download the dataset to the resources directory of your project by using the following command:
$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
Copy the code
Using curl to download the data is not important. You can also use the wget command or access the url to download the data. The key is to save the data files in your project’s Resources directory for easy application access.
Write a program
Create hotitems.java under SRC /main/ Java /myflink:
package myflink;
public class HotItems {
public static void main(String[] args) throws Exception {}}Copy the code
As before, we will fill in the code step by step. The first step is still create a StreamExecutionEnvironment, we add it into the main function.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// To print the results to the console out of order, we set the global concurrency to 1, where changing the concurrency has no effect on the correctness of the results
env.setParallelism(1);
Copy the code
Creating a mock data source
In the data Preparation section, we have downloaded the test data set locally. Since this is a CSV file, we will create the mock data source using CsvInputFormat.
Note: Although a streaming application should be an always-running application, it needs to consume an unlimited data source. However, in this case tutorial, to save the hassle of building real data sources, we use files to simulate real data sources, which does not affect the following knowledge. This is also a common way to verify the correctness of Flink applications locally.
We will create a POJO class for UserBehavior (all member variables declared public is the POJO class), strongly typed to facilitate subsequent processing.
/** User behavior data structure **/
public static class UserBehavior {
public long userId; / / user ID
public long itemId; ID / / commodities
public int categoryId; // Product category ID
public String behavior; // User behavior, including ("pv", "buy", "cart", "FAv ")
public long timestamp; // The timestamp when the behavior occurred, in seconds
}
Copy the code
Next we can create a PojoCsvInputFormat, which is an input that reads the CSV file and converts each line to the specified POJO type (in our case, UserBehavior).
// the local file path of userbehavior.csv
URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// Extract UserBehavior's TypeInformation, which is a PojoTypeInfo
PojoTypeInfo pojoType = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserBehavior.class);
// Since the order of the fields extracted by Java reflection is uncertain, you need to explicitly specify the order of the fields in the file below
String[] fieldOrder = new String[]{"userId"."itemId"."categoryId"."behavior"."timestamp"};
/ / create PojoCsvInputFormat
PojoCsvInputFormat csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
Copy the code
Next we create the input source using PojoCsvInputFormat.
DataStream dataSource = env.createInput(csvInput, pojoType);
Copy the code
This creates a DataStream of type UserBehavior.
EventTime and Watermark
When we say “count the hits in the last hour”, what does “one hour” mean? In Flink it can mean either ProcessingTime or EventTime, as determined by the user.
-
ProcessingTime: Time at which the event is processed. Which is determined by the system time of the machine.
-
EventTime: Indicates the time when an event occurs. Generally, it is the time that the data itself carries.
In this case, we need to count clicks per hour on business time, so we do it based on EventTime. So what if we let Flink handle the business time we want? There are two main things to do here.
The first thing is to tell Flink that we are now processing in EventTime mode. Flink uses ProcessingTime by default, so we need to set it explicitly.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Copy the code
The second thing is to specify how to get business time and generate Watermark. Watermark is a concept used to track business events, which can be interpreted as a clock in the EventTime world that indicates when data is currently being processed. Since the data in our data source has been organized and not out of order, i.e. the event timestamp is monotonically increasing, the business time of each piece of data can be used as the Watermark. Here we use AscendingTimestampExtractor to achieve timestamp extract and the generation of Watermark.
Note: real business scenarios are generally exist the out-of-order, use BoundedOutOfOrdernessTimestampExtractor commonly so.
DataStream timedData = dataSource
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// Convert the raw data in seconds to milliseconds
return userBehavior.timestamp * 1000; }});Copy the code
Now we have a time-stamped data stream, and we can do some windowing later.
Filter out click events
Before starting the window operation, review the requirement that “output the top N most-clicked items in the past hour every 5 minutes”. Since there are various behaviors of click, add purchase, purchase and favorites in the original data, we only need to count the clicks, so we first use FilterFunction to filter out the click behavior data.
DataStream pvData = timedData
.filter(new FilterFunction() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
// Filter out click-only data
return userBehavior.behavior.equals("pv"); }});Copy the code
Window counts clicks
The window size is one hour and slides every five minutes because the number of clicks on each item in the last hour is counted every five minutes. That is, the click quantity of commodities in [09:00, 10:00), [09:05, 10:05), [09:10, 10:10), etc., is a common Sliding Window demand.
DataStream windowedData = pvData
.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());
Copy the code
We use.keyby (“itemId”) to group items, and use.timewindow (Time size, Time slide) to make a slide window for each item (1 hour window, 5 minutes slide). Then we use. Aggregate (AggregateFunction AF, WindowFunction WF) for incremental aggregation operation, which can use AggregateFunction to aggregate data in advance and reduce the storage pressure of state. Compared to.apply(WindowFunction wf), which stores all the data in the window, the final calculation is much more efficient. The first parameter of the aggregate() method is used
AggregateFunction CountAgg implements the AggregateFunction interface, which counts the number of entries in the window.
/** COUNT aggregate function implementation of statistics, each occurrence of a record plus one */
public static class CountAgg implements AggregateFunction {
@Override
public Long createAccumulator(a) {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
returnacc1 + acc2; }}Copy the code
. Aggregate (AggregateFunction af, WindowFunction wF) The second parameter WindowFunction outputs the aggregated results of each key and each window with other information. The WindowResultFunction that we’re implementing here wraps the primary key item ID, window, click count as ItemViewCount.
/** the result of the output window */
public static class WindowResultFunction implements WindowFunction {
@Override
public void applyTuple key, itemId TimeWindow window, Iterable aggregateResult, // Result of aggregate function, Collector Collector // Output type ItemViewCount) throws Exception { Long itemId = ((Tuple1) key).f0; Long count = aggregateResult.iterator().next(); collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); }}/** Product clicks (window operation output type) */
public static class ItemViewCount {
public long itemId; ID / / commodities
public long windowEnd; // Window end timestamp
public long viewCount; // The number of clicks on the product
public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
ItemViewCount result = new ItemViewCount();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.viewCount = viewCount;
returnresult; }}Copy the code
Now we have a stream of clicks for each item in each window.
TopN counts the hottest items
To count the most popular items in each window, we need to group them again by window, here keyBy() according to windowEnd in ItemViewCount. You then use ProcessFunction to implement a custom TopN function, TopNHotItems, to calculate the top 3 clickable items and format the ranking results into strings for subsequent output.
DataStream topItems = windowedData
.keyBy("windowEnd")
.process(new TopNHotItems(3)); // Get the top 3 clicks
Copy the code
ProcessFunction is a low-level API provided by Flink for more advanced functionality. It mainly provides timer function (support EventTime or ProcessingTime). In this case, we will use timer to determine when the click data of all goods under a window is collected. Because Watermark progress is global,
In the processElement method, we register a windowEnd+1 timer every time we receive a piece of data (ItemViewCount) (the Flink framework automatically ignores double registrations at the same time). When the windowEnd+1 timer is triggered, it means that the Watermark of windowEnd+1 is received, that is, all the merchandise window statistics under that windowEnd are collected. In onTimer(), we sort all the collected items and clicks, select TopN, format the ranking information into a string and output it.
Here we also use ListState
to store each ItemViewCount message we receive, ensuring non-loss and consistency of state data in the event of a failure. ListState is a State API similar to Java List interface provided by Flink, which integrates the framework’s checkpoint mechanism and automatically achieves the exact-once semantic guarantee.
/** find the TopN items in a window, key is the window timestamp, output as TopN result string */
public static class TopNHotItems extends KeyedProcessFunction {
private final int topSize;
public TopNHotItems(int topSize) {
this.topSize = topSize;
}
// It is used to store the status of goods and clicks. After collecting the data of the same window, TopN calculation will be triggered
private ListState itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// State registration
ListStateDescriptor itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void processElement( ItemViewCount input, Context context, Collector collector) throws Exception {
// Each piece of data is saved to the state
itemState.add(input);
// Register an EventTime Timer for windowEnd+1. When triggered, all items belonging to the windowEnd window are collected
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}
@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector out) throws Exception {
// Get clicks on all items received
List allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// Clear state data in advance to free space
itemState.clear();
// Sort from most clicks to least clicks
allItems.sort(new Comparator() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.viewCount - o1.viewCount); }});// Format the ranking information into a String for easy printing
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("Time:").append(new Timestamp(timestamp-1)).append("\n");
for (int i=0; iCopy the code
A printout
As a final step, we print the results to the console and call env.execute to execute the task.
topItems.print();
env.execute("Hot Items Job");
Copy the code
To run the program
Run the main function directly, and you’ll see the popular product ids at each point in time.
The full code for this article can be accessed on GitHub. This article learns and practices several of Flink’s core concepts and API usage by implementing a “real-time hot Commodity” case. Including the use of EventTime, Watermark, State, Window API, and TopN implementation. I hope this article can deepen your understanding of Flink and help you solve the problems encountered in actual combat.
For more information, please visit the Apache Flink Chinese community website