Background demand

In the field of e-commerce, if a user buys a product and does not make an evaluation within 24 hours after the order is completed, the system will automatically give five-star praise. Today, we mainly use flink timer to simply realize this function.

Case,

Custom source

First, we simulate generating some order data by customizing the source. Here, we generate the simplest Tuple2 with the order ID and order completion time fields.


 public static class MySource implements SourceFunction<Tuple2<String.Long>>{
  private volatile boolean isRunning = true;

  @Override
 public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{  while (isRunning){  Thread.sleep(1000);  / / order id  String orderid = UUID.randomUUID().toString();  // Order completion time  long orderFinishTime = System.currentTimeMillis();  ctx.collect(Tuple2.of(orderid, orderFinishTime));  }  }   @Override  public void cancel(a){  isRunning = false;  }  }  Copy the code

Timing processing logic

Let’s start with the code and explain the code in turn


 public static class TimerProcessFuntion
   extends KeyedProcessFunction<Tuple.Tuple2<String.Long>,Object>{

  private MapState<String,Long> mapState;
 // If there is no comment for more than interval (milliseconds), it will automatically receive five-star praise  private long interval = 0l;   public TimerProcessFuntion(long interval){  this.interval = interval;  }   @Override  public void open(Configuration parameters){  MapStateDescriptor<String,Long> mapStateDesc = new MapStateDescriptor<>(  "mapStateDesc". String.class, Long.class);  mapState = getRuntimeContext().getMapState(mapStateDesc);  }   @Override  public void onTimer(  long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception{  Iterator iterator = mapState.iterator();  while (iterator.hasNext()){  Map.Entry<String,Long> entry = (Map.Entry<String,Long>) iterator.next();   String orderid = entry.getKey();  boolean f = isEvaluation(entry.getKey());  mapState.remove(orderid);  if (f){  LOG.info("The order (OrderID: {}) has been evaluated in {} milliseconds and will not be processed", orderid, interval);  }  if (f){  // If the user does not make a rating, give the default five-star rating to the relevant interface in the call  LOG.info("Order (OrderID: {}) is not evaluated after {} milliseconds, call interface gives five star automatic praise", orderid, interval);  }  }  }   / * ** Whether the user has evaluated the order, in the production environment, can go to the relevant order system.* We're just making a passing judgment here *  * @param key  * @return * /  private boolean isEvaluation(String key){  return key.hashCode() % 2= =0;  }   @Override  public void processElement(  Tuple2<String,Long> value, Context ctx, Collector<Object> out) throws Exception{  mapState.put(value.f0, value.f1);  ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);  }  }   Copy the code
  1. First we define a state of type MapState where key is the order number and value is the order completion time
  2. When processElement processes the data, the information about each order is stored in the state. No processing is done and a timer is registered that takes longer than the order completion time than the interval.
  3. The onTimer method is triggered when a registered timed task reaches the time of the timer, which is where we mainly process it. We call the external interface to determine whether the user has made a comment, if not, the call interface will give five-star praise, if so, nothing will be processed, and remember to delete the corresponding order from MapState

Please refer to the complete code

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/timer/AutoEvaluation.java

Welcome to pay attention to my public [big data technology and application practice], timely access to more information