Scenarios in which loss can occur during message delivery:

  1. Producer —— MSG ——> MQ. You can enable the message delivery result callback to ensure that each message receives a callback.
  2. MQ. Make queues and messages persistent and build mirrored cluster queues.
  3. MQ——-callback—-> producer. A callback fails. If a message has not received a callback for a period of time, the delivery fails by default and the producer needs to deliver the message to MQ again. (In this scenario, the same message will be delivered repeatedly, and the consumer needs to ensure message idempotent consumption.)

First, the idea of implementation

Technology used:

  • SpringBoot
  • RabbitMQ
  • Mysql
  • MybatisPlus
  • XxlJob

Two, preparation, frame building

  • Entity database:
    • Message
/** * Message sending history **@author futao
 * @date2020/3/31. * /
public class Message extends IdTimeEntity {

    public Message(a) {}/** * The business data carried by the message */
    private String msgData;

    /** * Switch name */
    private String exchangeName;

    /** * Routing key */
    private String routingKey;

    /** * Message status **@see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum
    private int status;

    /** * Number of retries */
    private int retryTimes;

    /** * Next retry time */
    private LocalDateTime nextRetryDateTime;

  • Message state enumeration
/** * Message status enumeration **@author futao
 * @date2020/3/31. * /
public enum MessageStatusEnum {

    /** * 1= Sending */

    /** * 2= Failed to send */
    SUCCESS(2."Sent successfully"),

    /** * 3= Failed to send */
    FAIL(3."Send failed");

    private int status;
    private String description;
  • The RabbitMQ configuration
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: reliable-delivery
    connection-timeout: 15000
    # send confirmation
    publisher-confirms: true
    Route failed callback
    publisher-returns: true
      # Must be set to true to notify listeners of message routing failures rather than discarding messages
      mandatory: true

      # Maximum number of message retries
      max-retry-times: 5
      Retry interval
      retry-interval: 5s
    # queue definition
      user: user-queue
    Switch definition
      user: user-exchange
Three, coding

  • Queue switch definition and binding
/** * RabbitMQ queue definition and binding **@author futao
 * @date2020/3/31. * /
public class Declare {

    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) {
        return QueueBuilder
                //.withArgument("x-max-length", 2)

    public Exchange userExchange(@Value("${}") String userExchangeName) {
        return ExchangeBuilder

    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
  • To enhance RabbitTemplate, setconfirmCallback()The message delivery callback method andreturnCallback()Message routing failed callback method
/** * Bean enhancement * [Serious warning] : Beans cannot be injected into this class. Injected beans will not be enhanced by BeanPostProcessor. * The Bean to inject must be retrieved from the container * *@author futao
 * @date2020/3/20. * /
public class BeanEnhance implements BeanPostProcessor {

// @Resource
// private MessageMapper messageMapper;

    /** * Maximum number of retries for a message */
    private int maxRetryTimes;

    /** * Retry interval */
    private Duration retryInterval;

// @Autowired
// private RabbitTemplate rabbitTemplate;
// @Autowired
// private BeanEnhance enhance;

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        / / enhance RabbitTemplate
        if (RabbitTemplate.class.equals(bean.getClass())) {
            // This can be used to ensure that 100% of messages are delivered to rabbitMQ. (If a message (determined by ID) has not received the callback within a certain period of time, resend the message)
            // Need to set publisher-Confirm: true
            ((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> {
                String correlationDataId = correlationData.getId();
                if (ack) {
                    log.debug("Message [{}] delivered successfully, set message status in DB to delivered successfully", correlationDataId);
                                    .set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus())
                                    .eq(Message::getId, correlationDataId)
                } else {
                    log.debug("Message [{}] delivery failed,cause:{}", correlationDataId, cause);
                    //NACK, the message is retransmittedApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId); }});// Message route failed callback -- Publisher-returns: true and template: Mandatory: true must be set otherwise Rabbit will discard the message
            ((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.warn("Message routing failed callback... Do some compensation or record...");
                log.warn("message{}", message);
                log.warn("replyCode{}", replyCode);
                log.warn("replyText{}", replyText);
                log.warn("exchange{}", exchange);
                log.warn("routingKey{}", routingKey);
        return bean;

    /** * retransmit message when NACK **@param correlationDataId
    @Transactional(rollbackFor = Exception.class)
    public void reSend(String correlationDataId) {
        Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId);
        if (message.getRetryTimes() < maxRetryTimes) {
            // Retry
            ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
            // Update the DB message status
                            .set(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
  • Producers:

/ * * *@author futao
 * @date2020/3/31. * /
public class Sender {

    private Duration retryInterval;

    private RabbitTemplate rabbitTemplate;

    private MessageMapper messageMapper;

    private String userExchangeName;

    public void send(User user) {
        Message message = Message.builder()
                // Next retry time
        // Message drop
        CorrelationData correlationData = new CorrelationData(message.getId());
        // Messages are posted to MQ
  • The scheduled task scans the message status in the DB. If there are messages in the process of sending, and the current time >= next delivery time and retries <= Maximum number of retries, the task sends messages again.
    • XxlJob configuration
    switch: ON
      ### Dispatch center deployment and ADDRESS [Optional] : If multiple addresses exist in the dispatch center cluster deployment, separate them with commas (,). The executor will use this address for "executor heartbeat registration" and "task result callback". If it is null, auto registration is disabled.
      ### executor AppName [optional] : executor heartbeat registration group basis; If empty, auto registration is disabled
      appname: xxl-job-executor-rabbitmq
      [Optional] : The default value is blank to indicate that the IP address is automatically obtained. When multiple network cards are used, the specified IP address can be manually set. The IP address will not be bound to Host and is only used for communication. Address information is used for "actuator registration" and "dispatch center requests and triggers tasks";
      # ip:
      ### actuator port number [optional] : if less than or equal to 0, automatically obtain; The default port number is 9999. If multiple actuators are deployed on a single machine, configure different actuators.
      port: 9999
      [Optional] : You need to have read and write permission on the disk where the run log files are stored. If it is empty, the default path is used.
      logpath: data/applogs/xxl-job/jobhandler
      [Optional] : Expiration logs are automatically cleared. The expiration logs take effect when the limit value is greater than or equal to 3. Otherwise, for example, -1, disable the automatic clearing function.
      logretentiondays: 30
    ### Actuator communication TOKEN [optional] : non-space enabled;
  • Configuration
/** * xxl-job Configures **@author futao
 * @date2020/4/1. * /
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobConfig {

    private final Admin admin = new Admin();
    private final Executor executor = new Executor();

    public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) {">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        return xxlJobSpringExecutor;

    public static class Admin {
        private String addresses;

    public static class Executor {
        private String appName;
        private String ip;
        private int port;
        private String logPath;
  • Write periodic scan tasks

/** * scans the database for messages that need to be redelivered and redelivers **@author futao
 * @date2020/4/1. * /
public class MessageReSendJob extends IJobHandler {

    private RabbitTemplate rabbitTemplate;

    private MessageMapper messageMapper;

    private MessageReSendJob messageReSendJob;

    /** * Maximum number of retries */
    private int retryTimes;

    /** * Retry interval */
    private Duration retryInterval;

    /** * Batch read from the database */
    private static final int PAGE_SIZE = 100;

    @XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy")
    public ReturnT<String> execute(String s) throws Exception {
        long startTime = System.currentTimeMillis();"Start scanning for messages to be retried.");
        XxlJobLogger.log("Start scanning for messages to be retried.");
        service(1);"Scanning for messages to be retried is complete, [{}]ms", System.currentTimeMillis() - startTime);
        XxlJobLogger.log("Scanning for messages to be retried is complete, [{}]ms", System.currentTimeMillis() - startTime);
        return ReturnT.SUCCESS;

    public void service(int pageNum) {
        IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE),
                        // Message being sent
                        .eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
                        // The next delivery time has been reached
                        .le(Message::getNextRetryDateTime,; List<Message> messages = messageIPage.getRecords(); messages.forEach(message -> {if (retryTimes <= message.getRetryTimes()) {
                // The maximum number of deliveries has been reached, and the message is set to delivery failure
                messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId()));
            } else{ messageReSendJob.reSend(message); }});if(PAGE_SIZE == messages.size()) { service(++pageNum); }}/** * repost the message **@param message
    public void reSend(Message message) {
                        .set(Message::getRetryTimes, message.getRetryTimes() + 1)
                        .eq(Message::getId, message.getId())
        try {
            // Repost
            rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId()));
        } catch (Exception e) {
            log.error("Message [{}] delivery failed", JSON.toJSONString(message)); }}public void jobHandlerInit(a) {"before job execute...");
        XxlJobLogger.log("before job handler init...");

    public void jobHandlerDestroy(a) {"after job execute...");
  • XxlJob Adds a scheduling task

Four, test,

  • The test interface
/ * * *@author futao
 * @date2020/4/1. * /
public class UserController {

    private Sender sender;

    public void send(a) {
  • Normal scenario:

Message dropped, status 1= sending

The callback

The message is set to post successfully

  • Abnormal scenario

Stop MQ sending messages after starting the producer service

Failed to receive an ACK for this message. So it’s always sending. Enable task scheduling to deliver again (the number of deliveries +1, and update the next delivery time)

When the maximum number of deliveries is reached, next time, the message is set to deliver failed

Operation log

# Next

  • Reliable information consumption
  • Traffic limiting protection at the consumer end
  • Dead-letter queue
  • Delays in the queue