A preface,

Deferred tasks are widely used. The typical application scenarios of deferred tasks are that the order is automatically cancelled due to timeout. Pay callback retry. The time-out cancellation of orders has idempotent property, so it is unnecessary to consider the problem of repeated consumption. The payment callback retry needs to consider the issue of repeated consumption.

Delayed tasks have the following characteristics: execute at a certain point in the future; Generally, this operation is performed only once.

1. Implementation principle

The producer sends a message with a delay to the RabbitMQ switch, waits for the delay to end and forwards the message to the bound queue, and the consumer consumes the message by listening to the queue. The key to delayed tasks is that messages stay on the switch.

It is obvious that implementing delayed tasks based on RabbitMQ requires high server reliability. There is no persistence mechanism for internal messages on the switch. For example, the single-machine mode service is restarted and delayed tasks that have not been started are lost.

2. Component selection

Second, program design

(1) Server

The RabbitMQ service requires the X-delayed -message plug-in to handle delayed messages.

(2) Producers

The implementation of deferred tasks requires the producer to reliably deliver messages to the switch, so the confirm confirmation mechanism can be used.

Once the order is generated, the order details are stored in Redis with the order ID as the key (persisted) and an asynchronous confirm request is sent to RabbitMQ. If normal delivery is returned, delete the data with order ID as key in Redis and reclaim the memory; otherwise, query the order data from Redis with order ID as key and send it again.

(3) Consumers

The realization of delayed task requires consumers to consume messages in a way that information is not lost, which is embodied in: manually confirm the consumption of messages to prevent message loss; The consumer side is stable to prevent message accumulation; Message consumption failure has a retry mechanism.

Considering that delayed order cancellation is idempotent, there is no need to worry about repeated consumption of messages.

Three, SpringBoot implementation

The implementation part only posts a part of the core source code, please visit GitHub for the complete project.

(1) Producers

Considering that placing an order is a very important operation, the order is first placed in storage, saved, and then the subsequent operation.

for (long i = 1; i <= 10; i++) {
    /* 1
    BuOrder order = createOrder(i);
    /* 2. Order storage */
    orderService.removeById(order);
    orderService.saveOrUpdate(order);
    /* 3. Save the order to the information Redis */
    RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order);
    /* 4. Send messages to RabbitMQ asynchronously */
    rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
}
Copy the code

Producers deliver messages reliably

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    if (correlationData == null) {
        return;
    }
    String key = ORDER_PREFIX + correlationData.getId();
    if (ack) {
        /* If the message is delivered successfully, delete the order data in Redis and reclaim memory */
        RedisUtils.deleteObject(key);
    } else {
        /* Read the order data from Redis, repost */
        BuOrder order = RedisUtils.getObject(key, BuOrder.class);
        /* Repost the message */
        rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); }}Copy the code

(2) Consumers

Manual confirmation on the consumer side to avoid message loss; If the fault fails, retry automatically.

@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME)
public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException {
    if (Objects.equals(0, order.getOrderStatus())) {
        /* Change the order status to close */
        orderService.updateById(new BuOrder(order.getOrderId(), -1));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.info(String.format("Message with consumption number [%s] at Consumer node 01", order.getOrderId())); }}Copy the code

Consumers should have at least two or more applications open for reliable consumption to ensure that there are no messages in the message queue.

(iii) Universal tool kit

The code above refers to a RabbitUtils utility class that exists in the following dependencies that encapsulate the most common RabbitMQ utility methods.

<dependency>
  <groupId>xin.altitude.cms</groupId>
  <artifactId>ucode-cms-common</artifactId>
  <version>1.43.1.</version>
</dependency>
Copy the code

Like this article click ♥️ like ♥️ support, if necessary, can contact me through wechat Dream4S. Related source code in GitHub, video explanation in B station, this article in the blog world.