1. The background

In daily development, delayed tasks are an unavoidable topic. In order to achieve the purpose of delay, there will be different solutions in different scenarios, and the recognition of the advantages and disadvantages of each solution determines the effectiveness of architectural decisions.

In this paper, taking the business scenario of overdue orders of e-commerce as the business scenario, a variety of solutions are derived and the advantages and disadvantages of each solution are analyzed. The solutions involved include:

1. Database polling scheme. 2. Single-node memory solution. 3. Distributed delay queue scheme.

Finally, in order to improve the efficiency of research and development, we will use declarative programming idea to encapsulate the distributed delay queue scheme and effectively separate business and technology.

1.1 Service Scenarios

The business scenario is very simple, which is the most familiar e-commerce order. I believe many careful friends have noticed that after we place an order on the e-commerce platform, if the payment has not been made within a certain period of time, the system will automatically set the order to timeout and automatically cancel, so as to release the bound resources.

The core process is as follows:

1. Place orders on e-commerce platforms and generate orders to be paid; 2. If the payment is not completed within the specified time, the system will automatically cancel the order, and the order status will change to “Timeout cancellation”; 3. The order will become “paid” after payment is completed within the specified time.

The order state machine is as follows:

The state machine

1.2 Basic Components overview

The entire Demo uses DDD design ideas. For easy understanding, we first introduce the basic components involved:

1.2.1. OrderInfo

Order aggregation root, which provides business methods such as build and cancel. The specific code is as follows:

@Data @Entity @Table(name = "order_info") public class OrderInfo { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @Column(name = "status") @Enumerated(EnumType.STRING) private OrderInfoStatus orderStatus; @Column(name = "create_time") private Date createTime = new Date(); / * * * * / cancel order public void the cancel () {setOrderStatus (OrderInfoStatus. CANCELLED); } public static OrderInfo create(Date createDate){OrderInfo OrderInfo = new  OrderInfo(); orderInfo.setCreateTime(createDate); orderInfo.setOrderStatus(OrderInfoStatus.CREATED); return orderInfo; }}Copy the code

1.2.2 OrderInfoRepository

Based on Spring Data Jpa implementation, mainly used for database access, the code is as follows:

public interface OrderInfoRepository extends JpaRepository<OrderInfo, Long> {
    List<OrderInfo> getByOrderStatusAndCreateTimeLessThan(OrderInfoStatus created, Date overtime);
}
Copy the code

Spring Data generates proxy objects based on method signatures or @Query annotations, enabling basic database access without us having to write any code.

1.2.3. OrderInfoService

The application service layer, oriented to User Case, mainly completes the orchestration of business process, and the check code is as follows:

@Service @Slf4j public class OrderInfoService { @Autowired private ApplicationEventPublisher eventPublisher; @Autowired private OrderInfoRepository orderInfoRepository; /** * create order, save to database * 2. Publish domain events, @transactional (readOnly = false) public void create(Date createDate){OrderInfo OrderInfo = OrderInfo.create(createDate); this.orderInfoRepository.save(orderInfo); eventPublisher.publishEvent(new OrderInfoCreateEvent(orderInfo)); } @transactional (readOnly = false) public void cancel(Long orderId){Optional<OrderInfo> orderInfoOpt = this.orderInfoRepository.findById(orderId); if (orderInfoOpt.isPresent()){ OrderInfo orderInfo = orderInfoOpt.get(); orderInfo.cancel(); this.orderInfoRepository.save(orderInfo); log.info("success to cancel order {}", orderId); }else { log.info("failed to find order {}", orderId); }} / * * * * for unpaid overtime order @ return * / @ Transactional (readOnly = true) public List < OrderInfo > findOvertimeNotPaidOrders (the Date deadLine){ return this.orderInfoRepository.getByOrderStatusAndCreateTimeLessThan(OrderInfoStatus.CREATED, deadLine); }}Copy the code

1. OrderController

External exposed Web interface, provide interface to create orders, mainly used for testing, the code is as follows:

@RestController @RequestMapping("order") public class OrderController { @Autowired private OrderInfoService orderInfoService; /** * Generate a new order, */ @postMapping ("insertTestData") public void createTestOrder(){Date Date = dateutils.addminutes (new Date(), - 30); date = DateUtils.addSeconds(date, 10); this.orderInfoService.create(date); }}Copy the code

With the dependent components introduced, let’s move on to the first scenario.

2. Database polling scheme

This is the simplest solution. Each order has its creation time saved. You just need to write a scheduled task to search the database for overdue orders that have not been paid and execute order cancellations in sequence.

2.1. Scheme implementation

The core process is as follows:

Database polling scheme

1. The user creates an order and saves the order information to the database. 2. Set a scheduled task and trigger the check task every second. 3. Perform the task as follows. • Search the database for overdue orders; • Perform the specified Cancel operation in sequence; • Save changes to the database;

The core code is as follows:

@Service @Slf4j public class DatabasePollStrategy { @Autowired private OrderInfoService orderInfoService; /** * run every 1S <br/> * 1. Query DB for overdue orders (CREATED, less than deadLintDate) * 2. */ @scheduled (fixedDelay = 1 * 1000) public void poll(){Date now = new Date(); Date overtime = DateUtils.addMinutes(now, -30); List<OrderInfo> overtimeNotPaidOrders = orderInfoService.findOvertimeNotPaidOrders(overtime); log.info("load overtime Not paid orders {}", overtimeNotPaidOrders); overtimeNotPaidOrders.forEach(orderInfo -> this.orderInfoService.cancel(orderInfo.getId())); }}Copy the code

2.2. Scheme summary

1. Advantages: Simple • Easy to develop. Low system complexity, especially with the help of Spring Schedule; • Simple tests. No external dependence, logical concentration, convenient and quick problem location; • Easy to launch. No tedious configuration, complex application process; 2. Disadvantages: • Heavy database load Non-stop polling, will increase the load of the database; • Lack of timeliness. The highest task delay is polling time, which is not suitable for scenarios with high timeliness requirements (enough in order scenarios). • A large number of invalid polls exist. A large number of invalid scans in the absence of back orders; • No peak elimination ability. A large number of overdue orders appear in a short time, which will lead to centralized execution of tasks and obvious business peak;

In short, this scheme is very suitable for small business magnitude, business iteration fast projects.

3. Single-node memory solution

For delayed tasks, the JDK has plenty of tools for us to use to solve our problems.

3.1 DelayQueue

DelayQueue is a special blocking queue. You can specify a delay time for each task. The task can be retrieved only after the delay time is reached.

The overall structure is as follows:

Delay queue

The core process is as follows:

1. After placing an order, the user submits a task to the delay queue; 2. When the time reaches, the background worker thread reads the task from the queue. 3. The worker thread calls CancelOrder to cancel the overdue order.

The core code is as follows:

@Slf4j @Service public class DelayQueueStrategy implements SmartLifecycle { private final DelayQueue<DelayTask> delayTasks = new DelayQueue<>(); private final Thread thread = new OrderCancelWorker(); private boolean running; @Autowired private OrderInfoService orderInfoService; @ TransactionalEventListener public void onOrderCreated (OrderInfoCreateEvent event) {/ / will be the order number In the time delay queue this.delayTasks.offer(new DelayTask(event.getOrderInfo().getId(), 10)); log.info("success to add Delay Task for Cancel Order {}", event.getOrderInfo().getId()); } @override public void start() {if (this.running){return; } this.thread.start(); this.running = true; } /** * Override public void stop() {if (! this.running){ return; } this.thread.interrupt(); this.running = false; } @Override public boolean isRunning() { return this.running; } @Override public boolean isAutoStartup() { return true; } /** */ @value private static class DelayTask implements Delayed{private final Long orderId; private final Date runAt; private DelayTask(Long orderId, int delayTime) { this.orderId = orderId; this.runAt = DateUtils.addSeconds(new Date(), delayTime); } @param timeUnit * @override public long getDelay(timeUnit timeUnit) {return timeUnit.convert(getRunAt().getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed delayed) { if (delayed == this) { return 0; } else { long d = this.getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS); return d == 0L ? 0 : (d < 0L ? 1:1); }}} private class OrderCancelWorker extends Thread {@override public void run() { Determine whether to exit while (! Thread.currentThread().isInterrupted()){ DelayTask task = null; Try {// Get the task from the queue task = delaytasks.take (); } catch (InterruptedException e) { e.printStackTrace(); } // Cancel the order if (task! = null){ orderInfoService.cancel(task.getOrderId()); log.info("Success to Run Delay Task, Cancel Order {}", task.getOrderId()); } } } } }Copy the code

This scheme, the idea is very simple, but there is a certain complexity, the need for manual maintenance of the life cycle of the worker thread. Comparatively speaking, the JDK already encapsulates our scenario, which is the DelayQueue-based ScheduledExecutorService.

3.2 ScheduledExecutorService

ScheduledExecutorService is a scheduling component based on DelayQueue. Compared with previous timers, ScheduledExecutorService has great advantages.

The overall structure is as follows:

ScheduleExecutorService

The core process is as follows:

1. After placing an order, register a scheduled task with ScheduledExecutorService. 2. ScheduledExecutorService starts the task. 3. The thread pool thread calls CancelOrder to cancel the overdue order.

The core code is as follows:

@Slf4j @Service public class ScheduleExecutorStrategy { @Autowired private OrderInfoService orderInfoService; private ScheduledExecutorService scheduledExecutorService; public ScheduleExecutorStrategy(){ BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder() .namingPattern("Schedule-Cancel-Thread-%d") .daemon(true) .build(); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, basicThreadFactory); } @ TransactionalEventListener public void onOrderCreated (OrderInfoCreateEvent event) {/ / add tasks regularly this.scheduledExecutorService.schedule(new CancelTask(event.getOrderInfo().getId()), 5, TimeUnit.SECONDS); log.info("Success to add cancel task for order {}", event.getOrderInfo().getId()); } private class CancelTask implements Runnable{ private final Long orderId; private CancelTask(Long orderId) { this.orderId = orderId; } @override public void run() {orderInfoService.cancel(this.orderId); log.info("Success to cancel task for order {}", this.orderId); }}}Copy the code

Compared with DelayQueue, ScheduledExecutorService has much less code and avoids tedious details.

3.3 summary

Advantages:

1. Avoid DB polling and reduce DB pressure; 2. The overall program is simple, using JDK components to complete, without additional dependencies;

Disadvantages:

1. Tasks are easily lost. Tasks are stored in memory. If the service is restarted or the server is down, memory tasks may be lost. 2. The single-node policy lacks the cluster capability.

In order to solve the problem of stand-alone memory scheme, we need to introduce distributed scheme.

In the single machine memory scheme, in addition to delay queue implementation, there is a “time wheel” scheme, can greatly reduce memory consumption, interested partners can study.

4. Distributed delay queue scheme

Memory queues themselves have many limitations, and in practice, we usually introduce distributed solutions.

4.1 Delay queue based on Redis

Redis is the most commonly used infrastructure, as a data structure server, with the help of rich data structures, can be encapsulated into a variety of advanced structures, delay queue is one of them.

To avoid reinventing the wheel, we went straight to the delay queue in Redisson.

The overall architecture is basically the same as DelayQueue, except that the memory DelayQueue is upgraded to a distributed DelayQueue, which will not be discussed here.

First, introduce Redisson-related dependencies into the POM

<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> The < version > 3.16.2 < / version > < / dependency >Copy the code

Then, add the redis related configuration to the Application configuration file

Spring. Redis. Host = 127.0.0.1 spring. Redis. Port = 6379 spring. Redis. Database = 0Copy the code

Finally, you can inject the core component RedissonClient

@Autowired
private RedissonClient redissonClient;
Copy the code

The code after process integration is as follows:

@Slf4j @Service public class RDelayQueueStrategy implements SmartLifecycle { private boolean running; private Thread thread = new OrderCancelWorker(); private RBlockingQueue<Long> cancelOrderQueue; private RDelayedQueue<Long> delayedQueue; @Autowired private OrderInfoService orderInfoService; @Autowired private RedissonClient redissonClient; PostConstruct public void init(){this.cancelOrderQueue = redissonClient.getBlockingQueue("DelayQueueForCancelOrder"); this.delayedQueue = redissonClient.getDelayedQueue(cancelOrderQueue); } @TransactionalEventListener public void onOrderCreated(OrderInfoCreateEvent event){ this.delayedQueue.offer(event.getOrderInfo().getId(), 5L, TimeUnit.SECONDS); log.info("success to add Delay Task for Cancel Order {}", event.getOrderInfo().getId()); } @override public void start() {if (this.running){return; } thread.start(); this.running = true; } /** * Override public void stop() {if (! this.running){ return; } thread.interrupt(); this.running = false; } @Override public boolean isRunning() { return this.running; } @Override public boolean isAutoStartup() { return true; } private class OrderCancelWorker extends Thread {@override public void run() { Thread.currentThread().isInterrupted()){ Long orderId = null; OrderId = cancelOrderQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } // Cancel the order if (orderId! = null){ orderInfoService.cancel(orderId); log.info("Success to Run Delay Task, Cancel Order {}", orderId); } } } } }Copy the code

This scenario is very simple and applies to most business scenarios. However, Redis itself follows the AP model rather than the CP model, and messages are lost during cluster switching, so the RocketMQ solution is recommended for high consistency scenarios.

4.2 Based on RocketMQ delay queue

RocketMQ is alibaba’s open-source distributed messaging middleware. Its overall design borrots a lot of ideas from Kafka, but adds some extensions for business scenarios, of which delayed queuing is the most important part.

The overall architecture design is as follows:

RocketMQ delay queue

The core process is as follows:

1. After placing an order, the user submits a message to RocketMQ; 2. When the time is up, the consuming thread obtains the message from the work queue. 3. After parsing the message, the consuming thread calls CancelOrder to cancel the overdue order.

First, you need to add rocketMQ-related dependencies

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> The < version > 2.2.1 < / version > < / dependency >Copy the code

Then, add the relevant configuration to application

Rocketmq. Name - server = http://127.0.0.1:9876 rocketmq. Producer. The group = delay - task - demoCopy the code

Finally, we can send messages using RocketMQTemplate

@Autowired
private RocketMQTemplate rocketMQTemplate;
Copy the code

Note: RocketMQ does not support arbitrary times. Instead, it provides several fixed delay times that generally meet our business requirements. If the existing fixed delay does not meet our requirements, multiple deliveries can be made. For example, RocketMQ supports a maximum delay of 2 hours, and services need to be delayed for 24 hours. You only need to increase the expected execution time in the message body. After receiving a message, if the expected execution time is not reached, the message is sent back to the delay queue. If the expected execution time is reached, the correct tasks are executed.

Sending delayed messages:

@Service @Slf4j public class RocketMQBasedDelayStrategy { private static final String messageDelayLevel = "1s 5s 10s 30s  1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; @Autowired private RocketMQTemplate rocketMQTemplate; @ TransactionalEventListener public void onOrderCreated (OrderInfoCreateEvent event) {/ / send the data to RocketMQ delay queue Message<String> message = MessageBuilder .withPayload(String.valueOf(event.getOrderInfo().getId())) .build(); this.rocketMQTemplate.syncSend("delay-task-topic", message, 200, 2); log.info("success to sent Delay Task to RocketMQ for Cancel Order {}", event.getOrderInfo().getId()); }}Copy the code

Build the Consumer consumption message

@Service @Slf4j @RocketMQMessageListener(topic = "delay-task-topic", consumerGroup = "delay-task-consumer-group") public class RocketMQBasedDelayTaskConsumer implements RocketMQListener<MessageExt> { @Autowired private OrderInfoService orderInfoService; /** * Receive message callback, * @param message */ @override public void onMessage(MessageExt message) {byte[] body = message.getBody(); String idAsStr = new String(body); orderInfoService.cancel(Long.valueOf(idAsStr)); }}Copy the code

4.3 summary

The RocketMQ solution is typically used by Internet companies to solve latency problems.

Advantages, mainly from distributed service features:

1. High-performance. As a sharp tool for peak cutting and valley filling, the sending end, server and consumer end provide high performance; 2. High availability. Redis and RocketMQ both provide rich deployment patterns that are the foundation of high availability; 3. Scalability. Redis and RocketMQ clusters have good scalability.

Disadvantages:

1. Need intermediate support. First, the need for infrastructure support, Redis, RocketMQ will increase the cost of operation and maintenance; 2. You need to learn new apis. Need to master new API, increase learning cost, improper use may also cause problems;

5. Declarative programming

One of the most important principles in architectural design is to effectively separate technology and business from each other.

5.1 Declarative programming

Declarative programming is a programming paradigm, as opposed to imperative programming. It describes the nature of the goal so that the computer understands the goal, not the process. Declarative programming avoids the side effects of telling the computer the problem domain. Imperative programming requires algorithms that specify exactly what to do at each step.

Every time a middleware is introduced, developers need to learn a new SET of APIS. How to effectively reduce the cost of access is a huge challenge, and one of the most commonly used important means is declarative programming.

In simple terms, capabilities are abstracted so that they can be flexibly applied to desired scenarios through configuration.

First, let’s take a look at the final result:

@Service @Slf4j public class RocketMQBasedDelayService { @Autowired private OrderInfoService orderInfoService; /** * RocketMQBasedDelay specifies that the method is a delayed method. This annotation does two things: <br /> * 1. AOP based, intercepts the call to cancelOrder, converts the parameter to Message, and sends it to the delayed queue of RocketMQ * 2. For the cancelOrder method, create DefaultMQPushConsumer and subscribe to the message, @param orderId */ @rocketmqbasedDelay (topic = "delay-task-topic-ann", delayLevel = 2, consumerGroup = "CancelOrderGroup") public void cancelOrder(Long orderId){ if (orderId == null){ log.info("param is invalidate"); return; } this.orderInfoService.cancel(orderId); log.info("success to cancel Order for {}", orderId); }}Copy the code

Adding @RocketmqbasedDelay gives methods the ability to delay over normal methods, which is the power of declarative programming

1. Add @RocketmqbasedDelay annotation to the method, and configure the delay queue name, delay time, and consumer information; 2. When a method is called, it does not execute directly, but forwards the request to the Delayed queue of RocketMQ and returns directly; 3. When the message delay time reaches, the Consumer retrieves the message from the delay queue and calls the cancelOrder method to process the business process.

In this way, the access cost is greatly reduced and the probability of error is reduced.

5.2 Core Design

The core design is as follows:

RocketMQBasedDelay

At startup, two extension points were added:

1. Scan the @RocketmqBasedDelay annotation method and add the SendMessageInterceptor interceptor to the method. 2. Scan @ RocketMQBasedDelay annotation method, generate RocketMQConsumerContainer managed objects, and complete the DefaultMQPushConsumer configuration and launch;

The specific execution process is as follows:

1. When the method is called, it is intercepted by SendMessageInterceptor, which changes the original execution rules. The new process is as follows: • Serialization of request parameters; • Use RocketMQTemplate to send delayed messages; • Return directly, interrupting the original method call; 2. When the delay time, RocketMQConsumerContainer DefaultMQPushConsumer will get in to the message for business processing: • deserialization invocation parameters; • Calling business methods; • Return to consumption status;

5.3 Core Implementation

Core components fall into two main categories:

1. Work components. • SendMessageInterceptor. Intercept the request and forward it to the delayed queue of RocketMQ; • RocketMQConsumerContainer. The encapsulation of DefaultMQPushConsumer mainly completes the configuration of Consumer, registers the listener, and triggers the execution of the task after the message arrives. 2. Configure components. • RocketMQConsumerContainerRegistry. Scan, the Bean in the Spring container will @ RocketMQBasedDelay annotation methods encapsulated into RocketMQConsumerContainer, and register to the Spring container; • RocketMQBasedDelayConfiguration. Registered to the Spring container AOP interceptors and RocketMQConsumerContainerRegistry;

RocketMQBasedDelay notes:

@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RocketMQBasedDelay { /** * RocketMQ topic * @return */ String topic(); /** * delayLevel * @return */ int delayLevel(); /** * consumerGroup information * @return */ String consumerGroup(); }Copy the code

This annotation can be placed on a method and implemented at run time.

The core code of SendMessageInterceptor is as follows:

/** * intercepts method calls, RocketMQ's Topic */ @slf4J public class SendMessageInterceptor implements MethodInterceptor { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public Object invoke(MethodInvocation methodInvocation) throws Throwable { Method method = methodInvocation.getMethod(); RocketMQBasedDelay RocketMQBasedDelay = method.getannotation (RocketMQBasedDelay. Class); RocketMQBasedDelay = method.getannotation (RocketMQBasedDelay. / / 2. Converts the request parameters to MQ Object [] the arguments. = the methodInvocation getArguments (); String argData = serialize(arguments); Message<String> message = MessageBuilder .withPayload(argData) .build(); / / 3. Send the MQ this. RocketMQTemplate. SyncSend (rocketMQBasedDelay. The topic (), the message, 200, rocketMQBasedDelay. DelayLevel ()); log.info("success to sent Delay Task to RocketMQ for {}", Arrays.toString(arguments)); return null; } private String serialize(Object[] arguments) { Map<String, String> result = Maps.newHashMapWithExpectedSize(arguments.length); for (int i = 0; i < arguments.length; i++){ result.put(String.valueOf(i), SerializeUtil.serialize(arguments[i])); } return SerializeUtil.serialize(result); }}Copy the code

RocketMQConsumerContainer source code is as follows:

/** * Consumer container, Used for encapsulation of DefaultMQPushConsumer * / @ Data @ Slf4j public class RocketMQConsumerContainer implements InitializingBean, SmartLifecycle { private DefaultMQPushConsumer consumer; private boolean running; private String consumerGroup; private String nameServerAddress; private String topic; private Object bean; private Method method; @Override public boolean isAutoStartup() { return true; } @Override public void start() { if (this.running){ return; } try { this.consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } this.running = true; } @Override public void stop() { this.running = false; this.consumer.shutdown(); } @Override public boolean isRunning() { return running; } @override public void afterPropertiesSet() throws Exception {// Build DefaultMQPushConsumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup(this.consumerGroup); consumer.setNamesrvAddr(this.nameServerAddress); // Subscribe topic consumer. Subscribe (topic, "*"); / / add interceptors consumer. SetMessageListener (new DefaultMessageListenerOrderly ()); this.consumer = consumer; } private class DefaultMessageListenerOrderly implements MessageListenerOrderly { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); Byte [] body = messageExt.getBody(); byte[] body = messageext.getBody (); String bodyAsStr = new String(body); Map deserialize = SerializeUtil.deserialize(bodyAsStr, Map.class); Object[] params = new Object[method.getParameterCount()]; for (int i = 0; i< method.getParameterCount(); i++){ String o = (String)deserialize.get(String.valueOf(i)); if (o == null){ params[i] = null; }else { params[i] = SerializeUtil.deserialize(o, method.getParameterTypes()[i]); }} // Execute the business method method.invoke(bean, params); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setSuspendCurrentQueueTimeMillis(1000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; }}}Copy the code

RocketMQConsumerContainerRegistry source code is as follows:

/ * * * based on BeanPostProcessor# postProcessAfterInitialization to deal with each bean * scan be @ RocketMQBasedDelay annotations in the bean method, And the method encapsulation into RocketMQConsumerContainer, * to start the DefaultMQPushConsumer * / public class RocketMQConsumerContainerRegistry implements the BeanPostProcessor {private final AtomicInteger id = new AtomicInteger(1); @Autowired private GenericApplicationContext applicationContext; @Value("${rocketmq.name-server}") private String nameServerAddress; /** * Process each bean in turn * @param bean * @param beanName * @return * @throws BeansException */ @override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { // 1. Get @RocketMqBasedDelay annotation method Class targetCls = aopUtils.gettarGetClass (bean); List<Method> methodsListWithAnnotation = MethodUtils.getMethodsListWithAnnotation(targetCls, RocketMQBasedDelay.class); / / (2) for each @ RocketMQBasedDelay registered RocketMQConsumerContainer annotation Method for (Method Method: methodsListWithAnnotation){ String containerBeanName = targetCls.getName() + "#" + method.getName() + id.getAndIncrement(); RocketMQBasedDelay annotation = method.getAnnotation(RocketMQBasedDelay.class); applicationContext.registerBean(containerBeanName, RocketMQConsumerContainer.class, () -> createContainer(bean, method, annotation)); } return bean; } / build RocketMQConsumerContainer * * * * @ param proxy * @ param method * * @ @ param an annotation return * / private RocketMQConsumerContainer createContainer(Object proxy, Method method, RocketMQBasedDelay annotation) { Object bean = AopProxyUtils.getSingletonTarget(proxy); RocketMQConsumerContainer container = new RocketMQConsumerContainer(); container.setBean(bean); container.setMethod(method); container.setConsumerGroup(annotation.consumerGroup()); container.setNameServerAddress(nameServerAddress); container.setTopic(annotation.topic()); return container; }}Copy the code

RocketMQBasedDelayConfiguration source code is as follows:

@ Configuration public class RocketMQBasedDelayConfiguration {/ * * * statement RocketMQConsumerContainerRegistry, Scan the RocketMQBasedDelay method, * create DefaultMQPushConsumer and complete registration * @ return * / @ Bean public RocketMQConsumerContainerRegistry rocketMQConsumerContainerRegistry(){ return new RocketMQConsumerContainerRegistry(); } /** * Declare AOP interceptor * automatically intercepts when calling @rocketMqBasedDelay annotation method, Send the request to RocketMQ * @return */ @bean public SendMessageInterceptor messageSendInterceptor(){return new SendMessageInterceptor(); @param sendMessageInterceptor @return */ @bean public PointcutAdvisor pointcutAdvisor(@Autowired SendMessageInterceptor sendMessageInterceptor){ return new DefaultPointcutAdvisor(new AnnotationMatchingPointcut(null, RocketMQBasedDelay.class), sendMessageInterceptor); }}Copy the code

5.4 summary

Declarative programming has obvious design barriers, but at the cost of ease of use. This method of one-time investment and multiple value creation is highly recommended for application, which greatly improves the efficiency of r&d and reduces the probability of error.

6. Summary

In this paper, automatic cancellation of overdue orders is introduced as a business scenario

1.DB polling scheme; 2. Single memory solution based on delay queue and ScheduleExecutorService; 3. Distributed delay queue scheme based on Redis and RocketMQ;

The advantages and disadvantages of each scheme are described in detail, hoping that each partner can choose the best solution according to the business scenario in the actual development.

Finally, “declarative programming” is briefly introduced to reduce the cost of access through technical means.

As usual, the source code is attached