background
In the real-time processing of big data, real-time big-screen display has become a very important display item, such as the most famous big-screen real-time sales total price display on Singles’ Day. In addition to this, there are some other scenarios, such as the real-time display of our website’s current PV, UV and so on in our background system, in fact, the approach is similar.
Today we will do a simple simulation of e-commerce statistics large screen small example, we extract the most simple demand.
- Calculate the total sales volume from midnight to the current time in real time
- Calculate the top3 sales of each category
- Statistical results are updated every second
Instance to explain
Structural data
First of all, we simulated the generation of orders through the custom source, and generated a Tuple2. The first element is the classification, and the second element represents the amount of orders generated under this classification.
/ * ** Simulate order generation under a certain category* /
public static class MySource implements SourceFunction<Tuple2<String.Double>>{
private volatile boolean isRunning = true; private Random random = new Random(); String category[] = { "Women's"."Men's". "Book"."Home appliances". "Care"."Beauty". "Movement"."Game". "Outdoor"."Furniture". "Instrument"."Office" }; @Override public void run(SourceContext<Tuple2<String,Double>> ctx) throws Exception{ while (isRunning){ Thread.sleep(10); // a certain category String c = category[(int) (Math.random() * (category.length - 1))]; // A category generates a closing order for price double price = random.nextDouble() * 100; ctx.collect(Tuple2.of(c, price)); } } @Override public void cancel(a){ isRunning = false; } } Copy the code
Construct the statistical result class
public static class CategoryPojo{
// Class name
private String category;
// Change the total sales
private double totalPrice;
// Time up to the current time private String dateTime; getter and setter ........ } Copy the code
Define Windows and triggers
DataStream<CategoryPojo> result = dataStream.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.days(
1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
1))) .aggregate( new PriceAggregate(), new WindowResult() ); Copy the code
First we define a scrolling window of one day, then set a trigger of one second, and then perform aggregate calculations.
Collection of computing
private static class PriceAggregate
implements AggregateFunction<Tuple2<String.Double>,Double.Double>{
@Override
public Double createAccumulator(a){
return 0D; } @Override public Double add(Tuple2<String,Double> value, Double accumulator){ return accumulator + value.f1; } @Override public Double getResult(Double accumulator){ return accumulator; } @Override public Double merge(Double a, Double b){ return a + b; } } Copy the code
The aggregation calculation is also relatively simple, which is essentially a simple sum operation on Price
Collect window result data
private static class WindowResult
implements WindowFunction<Double.CategoryPojo.Tuple.TimeWindow>{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override public void apply( Tuple key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception{ CategoryPojo categoryPojo = new CategoryPojo(); categoryPojo.setCategory(((Tuple1<String>) key).f0); BigDecimal bg = new BigDecimal(input.iterator().next()); double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue(); categoryPojo.setTotalPrice(p); categoryPojo.setDateTime(simpleDateFormat.format(new Date())); out.collect(categoryPojo); } } Copy the code
Our most aggregated results are simply wrapped into a CategoryPojo class for subsequent processing
The result of using the aggregate window
result.keyBy("dateTime")
.window(TumblingProcessingTimeWindows.of(Time.seconds(
1)))
.process(new WindowResultProcess());
Copy the code
Next we want to use the result of the above aggregation, so we use the window aggregation result stream above to define a scroll window with a time of 1 second.
For the results of how to use the window, please refer to flink’s official website [1]
The results of statistical
The final result of the statistics, then we do here, we will add up each classification of the total price, is the total sales amount of total station, and then we calculated at the same time use the priority queue Top3 sales classification, print out the result, in the process of production we can deliver the result data to hbase or external storage such as redis, For the front-end real-time page display.
private static class WindowResultProcess
extends ProcessWindowFunction<CategoryPojo.Object.Tuple.TimeWindow>{
@Override
public void process( Tuple tuple, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception{ String date = ((Tuple1<String>) tuple).f0; Queue<CategoryPojo> queue = new PriorityQueue<>( 3. (o1, o2)->o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1); double price = 0D; Iterator<CategoryPojo> iterator = elements.iterator(); int s = 0; while (iterator.hasNext()){ CategoryPojo categoryPojo = iterator.next(); if (queue.size() < 3) { queue.add(categoryPojo); } else { CategoryPojo tmp = queue.peek(); if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()){ queue.poll(); queue.add(categoryPojo); } } price += categoryPojo.getTotalPrice(); } List<String> list = queue.stream() .sorted((o1, o2)->o1.getTotalPrice() <= o2.getTotalPrice() ? 1 : -1) .map(f->"(Classification:" + f.getCategory() + "Sales:" + f.getTotalPrice() + ")") .collect( Collectors.toList()); System.out.println("时间 : " + date + "Total price:" + price + " top3 " + StringUtils.join(list, ",")); System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- --"); } } Copy the code
Sample run results
3> CategoryPojo{category='outdoor', totalPrice=734.45, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='games', totalPrice=862.86, dateTime=2020-06-13 22:55:34}
4> CategoryPojo{category='care', totalPrice=926.83, dateTime=2020-06-13 22:55:34}
3> CategoryPojo{category='movement', totalPrice=744.98, dateTime=2020-06-13 22:55:34}
2> CategoryPojo{category='instruments', totalPrice=648.81, dateTime=2020-06-13 22:55:34} 4> CategoryPojo{category='book', totalPrice=1010.12, dateTime=2020-06-13 22:55:34} 1> CategoryPojo{category='furniture', totalPrice=880.35, dateTime=2020-06-13 22:55:34} 3> CategoryPojo{category='goods', totalPrice=1225.34, dateTime=2020-06-13 22:55:34} 2> CategoryPojo{category='men', totalPrice=796.06, dateTime=2020-06-13 22:55:34} 1> CategoryPojo{category='ladies', totalPrice=1018.88, dateTime=2020-06-13 22:55:34} 1> CategoryPojo{category='beauty', totalPrice=768.37, dateTime=2020-06-13 22:55:34} Time:2020-06-13 22:55:34The total price:9617.050000000001Top3 (category: home appliances sales:1225.34Category: Womenswear Sales:1018.88Category: Book Sales:1010.12) Copy the code
Please refer to the complete code
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/windows/BigScreem.java
The resources 【 1 】 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#working-with-window-results
Get more exciting content in time, welcome to follow my public account