Background demand
In the field of e-commerce, if a user buys a product and does not make an evaluation within 24 hours after the order is completed, the system will automatically give five-star praise. Today, we mainly use flink timer to simply realize this function.
Case,
Custom source
First, we simulate generating some order data by customizing the source. Here, we generate the simplest Tuple2 with the order ID and order completion time fields.
public static class MySource implements SourceFunction<Tuple2<String.Long>>{
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{ while (isRunning){ Thread.sleep(1000); / / order id String orderid = UUID.randomUUID().toString(); // Order completion time long orderFinishTime = System.currentTimeMillis(); ctx.collect(Tuple2.of(orderid, orderFinishTime)); } } @Override public void cancel(a){ isRunning = false; } } Copy the code
Timing processing logic
Let’s start with the code and explain the code in turn
public static class TimerProcessFuntion
extends KeyedProcessFunction<Tuple.Tuple2<String.Long>,Object>{
private MapState<String,Long> mapState;
// If there is no comment for more than interval (milliseconds), it will automatically receive five-star praise private long interval = 0l; public TimerProcessFuntion(long interval){ this.interval = interval; } @Override public void open(Configuration parameters){ MapStateDescriptor<String,Long> mapStateDesc = new MapStateDescriptor<>( "mapStateDesc". String.class, Long.class); mapState = getRuntimeContext().getMapState(mapStateDesc); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception{ Iterator iterator = mapState.iterator(); while (iterator.hasNext()){ Map.Entry<String,Long> entry = (Map.Entry<String,Long>) iterator.next(); String orderid = entry.getKey(); boolean f = isEvaluation(entry.getKey()); mapState.remove(orderid); if (f){ LOG.info("The order (OrderID: {}) has been evaluated in {} milliseconds and will not be processed", orderid, interval); } if (f){ // If the user does not make a rating, give the default five-star rating to the relevant interface in the call LOG.info("Order (OrderID: {}) is not evaluated after {} milliseconds, call interface gives five star automatic praise", orderid, interval); } } } / * ** Whether the user has evaluated the order, in the production environment, can go to the relevant order system.* We're just making a passing judgment here * * @param key * @return * / private boolean isEvaluation(String key){ return key.hashCode() % 2= =0; } @Override public void processElement( Tuple2<String,Long> value, Context ctx, Collector<Object> out) throws Exception{ mapState.put(value.f0, value.f1); ctx.timerService().registerProcessingTimeTimer(value.f1 + interval); } } Copy the code
- First we define a state of type MapState where key is the order number and value is the order completion time
- When processElement processes the data, the information about each order is stored in the state. No processing is done and a timer is registered that takes longer than the order completion time than the interval.
- The onTimer method is triggered when a registered timed task reaches the time of the timer, which is where we mainly process it. We call the external interface to determine whether the user has made a comment, if not, the call interface will give five-star praise, if so, nothing will be processed, and remember to delete the corresponding order from MapState
Please refer to the complete code
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/timer/AutoEvaluation.java
Welcome to pay attention to my public [big data technology and application practice], timely access to more information