1. Don’t say much, ask questions first (actual demand of an Internet company ~~~~)
##### if there is no payment within an hour after an order is generated, the order will be withdrawn automatically and punitive measures will be taken.
- The technologies covered in this article are RocketMQ version 4.3.1, JDK1.8, Protostuff version 1.1.3
2. Pom File (part)
< the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > this < / version > </dependency> <dependency> <groupId>com.dyuproject.protostuff</groupId> <artifactId>protostuff-core</artifactId> < version > 1.1.3 < / version > < / dependency > < the dependency > < groupId > com. Dyuproject. Protostuff < / groupId > < artifactId > protostuff - runtime < / artifactId > < version > 1.1.3 < / version > < / dependency >Copy the code
3. Go straight to the code! producers
@Autowired private NoticeService noticeService; Private RuntimeSchema<String> timeSchema = runTimeschema.createFrom (string.class); Public void test() {String messages = buildMQMessage(merchantorder.getorderno (), p.geetinvoke (), p.getMethod(), Datas.BORROW); this.noticeService.delayNotice(messages, this.timeSchema, "OtcTimer", "timer", p.getTimeLevel()); } /// // build JSON message body with orderNo, reflection method that needs to be timed to end execution, current method (for logging), Protected String buildMQMessage(String orderNo, String invoke, String method, String type) { JSONObject jsonObject = new JSONObject(); jsonObject.put("orderNo", orderNo); jsonObject.put("invoke", invoke); jsonObject.put("method", method); jsonObject.put("type", type); return jsonObject.toJSONString(); } /** *message body schema: add a buffer to make serialization faster Delay level messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h */ public <T> void delayNotice(T messages, RuntimeSchema<T> schema, String topic, String tags, Integer timeLevel) { String key = OtcUtil.createUUId(); try { byte[] bytes = ProtostuffIOUtil.toByteArray(messages, schema, LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)); Message Message = new Message(topic, tags, key, bytes); / / a message body message. SetDelayTimeLevel (timeLevel); / / set level (subscript) SendResult SendResult = this. DefaultMQProducer. Send (message); If (sendresult.getsendStatus () == sendStatus.send_OK) {}} Catch (Exception e) {// TODO: handle exception logger.error("sendMq onException , key : " + key, e); }}Copy the code
4. Go straight to the code! consumers
@Component public class OtcTimerConsumer { private final static Logger logger = LoggerFactory.getLogger(OtcTimerConsumer.class); private RuntimeSchema<String> schema = RuntimeSchema.createFrom(String.class); @Autowired private RedisService redisService; / / redis to heavy, To prevent repeated consumption @ the Qualifier (" borrowProcessTimerService ") the @autowired private ProcessTimerService borrowProcessTimerService; @EventListener(condition = "#event.topic == 'OtcTimer'") public void rocketmqMsgListen(DefaultMQCustomerEvent event) Throws Exception {try {// To check whether the key exists, delete String key = event.getmsg ().getKeys(); Set<Object> set = this.redisService.getRepeat(key); if (set.size() > 0) { return; String paramter = schema.newMessage(); String paramter = schema.newMessage(); ProtostuffIOUtil.mergeFrom(event.getMsg().getBody(), paramter, schema); if (StringUtils.isEmpty(paramter)) { throw new BusinessException(Codes.CODE_500, Messages.OTC_MQ_MESSAGE_ISNULL); } JSONObject jsonObject = JSONObject.parseObject(paramter); String orderNo = jsonObject.getString("orderNo"); String method = jsonObject.getString("method"); String invoke = jsonObject.getString("invoke"); String type = jsonObject.getString("type"); //invoke(reflection class) //invoke(reflection class) Method in parameter) if (type) equals (Datas. BORROW)) {this. BorrowProcessTimerService. GetClass () getMethod (invoke, String class, String.class, String.class) .invoke(this.borrowProcessTimerService, orderNo, method, invoke); } // Put the consumed key into the cache, To weigh this. RedisService. SetRepeat (" Otc: Timer: "+ +" : "+ invoke orderNo, paramter, Double. The valueOf (System. NanoTime ())); } catch (Exception e) { logger.error(e.getMessage(), e); throw new BusinessException(Codes.CODE_500, e.getMessage()); }}}Copy the code
5. Go straight to the code! Reflection method, handling withdrawal, punishment related logic
@Service public class BorrowProcessTimerService extends BorrowSuperService implements ProcessTimerService { @Transactional(rollbackFor = Exception.class) public void orderTimer(String orderNo, String method, String invoke) {// invoke logic // I use transaction, distributed lock == secure // handle exception log}}Copy the code
Summary: (1) No need to poll all orders, high efficiency (2) one order, the task is executed only once (3) good timeliness
Thank you for reading this, if this article is well written and if you feel there is something to it
Ask for a thumbs up 👍 ask for attention ❤️ ask for share 👥 for 8 abs I really very useful!!
If there are any mistakes in this blog, please comment, thank you very much! ❤ ️ ❤ ️ ❤ ️ ❤ ️