Scenarios in which loss can occur during message delivery:

  1. Producer —— MSG ——> MQ. You can enable the message delivery result callback to ensure that each message receives a callback.
  2. MQ. Make queues and messages persistent and build mirrored cluster queues.
  3. MQ——-callback—-> producer. A callback fails. If a message has not received a callback for a period of time, the delivery fails by default and the producer needs to deliver the message to MQ again. (In this scenario, the same message will be delivered repeatedly, and the consumer needs to ensure message idempotent consumption.)

First, the idea of implementation

Technology used:

  • SpringBoot
  • RabbitMQ
  • Mysql
  • MybatisPlus
  • XxlJob

Two, preparation, frame building

  • Entity database:
    • Message
/** * Message sending history **@author futao
 * @date2020/3/31. * /
@Getter
@Setter
@Builder
@TableName("message")
public class Message extends IdTimeEntity {

    @Tolerate
    public Message(a) {}/** * The business data carried by the message */
    @TableField("msg_data")
    private String msgData;

    /** * Switch name */
    @TableField("exchange_name")
    private String exchangeName;

    /** * Routing key */
    @TableField("routing_key")
    private String routingKey;

    /** * Message status **@see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum
     */
    @TableField("status")
    private int status;

    /** * Number of retries */
    @TableField("retry_times")
    private int retryTimes;

    /** * Next retry time */
    @TableField("next_retry_date_time")
    private LocalDateTime nextRetryDateTime;

}
Copy the code
  • Message state enumeration
/** * Message status enumeration **@author futao
 * @date2020/3/31. * /
@Getter
@AllArgsConstructor
public enum MessageStatusEnum {

    /** * 1= Sending */
    SENDING(1."Sending"),

    /** * 2= Failed to send */
    SUCCESS(2."Sent successfully"),

    /** * 3= Failed to send */
    FAIL(3."Send failed");

    private int status;
    private String description;
}
Copy the code
  • The RabbitMQ configuration
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: reliable-delivery
    connection-timeout: 15000
    # send confirmation
    publisher-confirms: true
    Route failed callback
    publisher-returns: true
    template:
      # Must be set to true to notify listeners of message routing failures rather than discarding messages
      mandatory: true

app:
  rabbitmq:
    retry:
      # Maximum number of message retries
      max-retry-times: 5
      Retry interval
      retry-interval: 5s
    # queue definition
    queue:
      user: user-queue
    Switch definition
    exchange:
      user: user-exchange
Copy the code

Three, coding

  • Queue switch definition and binding
/** * RabbitMQ queue definition and binding **@author futao
 * @date2020/3/31. * /
@Configuration
public class Declare {

    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) {
        return QueueBuilder
                .durable(userQueueName)
                //.withArgument("x-max-length", 2)
                .build();
    }

    @Bean
    public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
        return ExchangeBuilder
                .topicExchange(userExchangeName)
                .durable(true)
                .build();
    }

    @Bean
    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
                .bind(userQueue)
                .to(userExchange)
                .with("user.*") .noargs(); }}Copy the code
  • To enhance RabbitTemplate, setconfirmCallback()The message delivery callback method andreturnCallback()Message routing failed callback method
/** * Bean enhancement * [Serious warning] : Beans cannot be injected into this class. Injected beans will not be enhanced by BeanPostProcessor. * The Bean to inject must be retrieved from the container * *@author futao
 * @date2020/3/20. * /
@Slf4j
@Component
public class BeanEnhance implements BeanPostProcessor {

// @Resource
// private MessageMapper messageMapper;

    /** * Maximum number of retries for a message */
    @Value("${app.rabbitmq.retry.max-retry-times}")
    private int maxRetryTimes;

    /** * Retry interval */
    @Value("${app.rabbitmq.retry.retry-interval}")
    private Duration retryInterval;

// @Autowired
// private RabbitTemplate rabbitTemplate;
//
// @Autowired
// private BeanEnhance enhance;


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        / / enhance RabbitTemplate
        if (RabbitTemplate.class.equals(bean.getClass())) {
            // This can be used to ensure that 100% of messages are delivered to rabbitMQ. (If a message (determined by ID) has not received the callback within a certain period of time, resend the message)
            // Need to set publisher-Confirm: true
            ((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> {
                String correlationDataId = correlationData.getId();
                if (ack) {
                    //ACK
                    log.debug("Message [{}] delivered successfully, set message status in DB to delivered successfully", correlationDataId);
                    ApplicationContextHolder.getBean(MessageMapper.class).update(null,
                            Wrappers.<Message>lambdaUpdate()
                                    .set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus())
                                    .eq(Message::getId, correlationDataId)
                    );
                } else {
                    log.debug("Message [{}] delivery failed,cause:{}", correlationDataId, cause);
                    //NACK, the message is retransmittedApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId); }});// Message route failed callback -- Publisher-returns: true and template: Mandatory: true must be set otherwise Rabbit will discard the message
            ((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.warn("Message routing failed callback... Do some compensation or record...");
                log.warn("message{}", message);
                log.warn("replyCode{}", replyCode);
                log.warn("replyText{}", replyText);
                log.warn("exchange{}", exchange);
                log.warn("routingKey{}", routingKey);
            });
        }
        return bean;
    }


    /** * retransmit message when NACK **@param correlationDataId
     */
    @Transactional(rollbackFor = Exception.class)
    public void reSend(String correlationDataId) {
        Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId);
        if (message.getRetryTimes() < maxRetryTimes) {
            // Retry
            ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
            // Update the DB message status
            ApplicationContextHolder.getBean(MessageMapper.class).update(null,
                    Wrappers.<Message>lambdaUpdate()
                            .set(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
                            .set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                            .set(Message::getRetryTimes, message.getRetryTimes() + 1) .eq(Message::getId, correlationDataId) ); }}}Copy the code
  • Producers:

/ * * *@author futao
 * @date2020/3/31. * /
@Component
public class Sender {

    @Value("${app.rabbitmq.retry.retry-interval}")
    private Duration retryInterval;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MessageMapper messageMapper;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchangeName;

    public void send(User user) {
        Message message = Message.builder()
                .msgData(JSON.toJSONString(user))
                .exchangeName(userExchangeName)
                .routingKey("user.abc")
                .status(MessageStatusEnum.SENDING.getStatus())
                // Next retry time
                .nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                .retryTimes(0)
                .build();
        // Message drop
        messageMapper.insert(
                message
        );
        CorrelationData correlationData = new CorrelationData(message.getId());
        // Messages are posted to MQ
        rabbitTemplate.convertAndSend(userExchangeName, "user.abc", JSON.toJSONString(user), correlationData); }}Copy the code
  • The scheduled task scans the message status in the DB. If there are messages in the process of sending, and the current time >= next delivery time and retries <= Maximum number of retries, the task sends messages again.
    • XxlJob configuration
xxl:
  job:
    switch: ON
    admin:
      ### Dispatch center deployment and ADDRESS [Optional] : If multiple addresses exist in the dispatch center cluster deployment, separate them with commas (,). The executor will use this address for "executor heartbeat registration" and "task result callback". If it is null, auto registration is disabled.
      addresses: http://127.0.0.1:9090/xxl-job-admin
    executor:
      ### executor AppName [optional] : executor heartbeat registration group basis; If empty, auto registration is disabled
      appname: xxl-job-executor-rabbitmq
      [Optional] : The default value is blank to indicate that the IP address is automatically obtained. When multiple network cards are used, the specified IP address can be manually set. The IP address will not be bound to Host and is only used for communication. Address information is used for "actuator registration" and "dispatch center requests and triggers tasks";
      # ip:
      ### actuator port number [optional] : if less than or equal to 0, automatically obtain; The default port number is 9999. If multiple actuators are deployed on a single machine, configure different actuators.
      port: 9999
      [Optional] : You need to have read and write permission on the disk where the run log files are stored. If it is empty, the default path is used.
      logpath: data/applogs/xxl-job/jobhandler
      [Optional] : Expiration logs are automatically cleared. The expiration logs take effect when the limit value is greater than or equal to 3. Otherwise, for example, -1, disable the automatic clearing function.
      logretentiondays: 30
    ### Actuator communication TOKEN [optional] : non-space enabled;
    accessToken:
Copy the code
  • Configuration
/** * xxl-job Configures **@author futao
 * @date2020/4/1. * /
@Setter
@Getter
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobConfig {


    private final Admin admin = new Admin();
    private final Executor executor = new Executor();

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(xxlJobConfig.getAdmin().getAddresses());
        xxlJobSpringExecutor.setAppName(xxlJobConfig.getExecutor().getAppName());
        xxlJobSpringExecutor.setIp(xxlJobConfig.getExecutor().getIp());
        xxlJobSpringExecutor.setPort(xxlJobConfig.getExecutor().getPort());
        xxlJobSpringExecutor.setLogPath(xxlJobConfig.getExecutor().getLogPath());
        xxlJobSpringExecutor.setLogRetentionDays(xxlJobConfig.getExecutor().getLogRetentionDays());
        return xxlJobSpringExecutor;
    }

    @Getter
    @Setter
    public static class Admin {
        private String addresses;
    }

    @Getter
    @Setter
    public static class Executor {
        private String appName;
        private String ip;
        private int port;
        private String logPath;
        private intlogRetentionDays; }}Copy the code
  • Write periodic scan tasks

/** * scans the database for messages that need to be redelivered and redelivers **@author futao
 * @date2020/4/1. * /
@Slf4j
@Component
public class MessageReSendJob extends IJobHandler {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MessageMapper messageMapper;

    @Autowired
    private MessageReSendJob messageReSendJob;

    /** * Maximum number of retries */
    @Value("${app.rabbitmq.retry.max-retry-times}")
    private int retryTimes;

    /** * Retry interval */
    @Value("${app.rabbitmq.retry.retry-interval}")
    private Duration retryInterval;

    /** * Batch read from the database */
    private static final int PAGE_SIZE = 100;


    @XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy")
    @Override
    public ReturnT<String> execute(String s) throws Exception {
        long startTime = System.currentTimeMillis();
        log.info("Start scanning for messages to be retried.");
        XxlJobLogger.log("Start scanning for messages to be retried.");
        service(1);
        log.info("Scanning for messages to be retried is complete, [{}]ms", System.currentTimeMillis() - startTime);
        XxlJobLogger.log("Scanning for messages to be retried is complete, [{}]ms", System.currentTimeMillis() - startTime);
        return ReturnT.SUCCESS;
    }

    public void service(int pageNum) {
        IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE),
                Wrappers.<Message>lambdaQuery()
                        // Message being sent
                        .eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
                        // The next delivery time has been reached
                        .le(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))); List<Message> messages = messageIPage.getRecords(); messages.forEach(message -> {if (retryTimes <= message.getRetryTimes()) {
                // The maximum number of deliveries has been reached, and the message is set to delivery failure
                messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId()));
            } else{ messageReSendJob.reSend(message); }});if(PAGE_SIZE == messages.size()) { service(++pageNum); }}/** * repost the message **@param message
     */
    public void reSend(Message message) {
        messageMapper.update(null,
                Wrappers.<Message>lambdaUpdate()
                        .set(Message::getRetryTimes, message.getRetryTimes() + 1)
                        .set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                        .eq(Message::getId, message.getId())
        );
        try {
            // Repost
            rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId()));
        } catch (Exception e) {
            log.error("Message [{}] delivery failed", JSON.toJSONString(message)); }}public void jobHandlerInit(a) {
        log.info("before job execute...");
        XxlJobLogger.log("before job handler init...");
    }

    public void jobHandlerDestroy(a) {
        log.info("after job execute...");
        XxlJobLogger.log("after job execute..."); }}Copy the code
  • XxlJob Adds a scheduling task

Four, test,

  • The test interface
/ * * *@author futao
 * @date2020/4/1. * /
@RequestMapping("/user")
@RestController
public class UserController {

    @Autowired
    private Sender sender;

    @RequestMapping("/send")
    public void send(a) {
        sender.send(User
                .builder()
                .userName("Astronomical")
                .birthday(LocalDate.of(1995.1.31))
                .address("Hangzhou, Zhejiang") .build()); }}Copy the code
  • Normal scenario:

Message dropped, status 1= sending

The callback

The message is set to post successfully

  • Abnormal scenario

Stop MQ sending messages after starting the producer service

Failed to receive an ACK for this message. So it’s always sending. Enable task scheduling to deliver again (the number of deliveries +1, and update the next delivery time)

When the maximum number of deliveries is reached, next time, the message is set to deliver failed

Operation log

# Next

  • Reliable information consumption
  • Traffic limiting protection at the consumer end
  • Dead-letter queue
  • Delays in the queue