1. Problem Description:

A queue of RabbitMQ messages was piled up during o&M monitoring

The number of messages Unacked in mq is 2000, Ready: 652

2. Problem location

Find the corresponding service and business code according to the queue, and check the consumption of messages from kibana according to the log of the consumer end:

You can see that messages are normally consumed and processed before 9:20.

The containerFactory is set up where the code locates the consumer listening

BatchQueueRabbitListenerContainerFactory container factory set up, but consumer processing code inside didn’t manually ack!!!!!!!!!!

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

public SimpleRabbitListenerContainerFactory batchQueueRabbitListenerContainerFactory( @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // Set batch factory.setBatchListener(true); factory.setConsumerBatchEnabled(true); factory.setBatchSize(100); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); factory.setMessageConverter(messageConverter); return factory; }Copy the code

To quickly solve the problem of online message accumulation, change the manual ACK to automatic ACK.

Once mq went live, something weird happened again. Although messages in MQ were quickly consumed and no longer piled up, the buffer table for message processing continued to grow, although there were occasional drops, but nowhere near as fast. Soon the size of the buffer table grew to several w (mq logic is like write buffer table -> send MQ -> consume MQ -> business logic processing -> delete MQ)

So immediately go to the log printed by the consumer side service:

Because each processed an mq message, buffer table data will be deleted, but observation logs, although in the consumer, but a few minute processing, sometimes interval a few minutes to deal with, buffer table data have been accumulated, at first we habitually guess affirmation because messages so business side processing of them is slow, Or it takes time to process business logic, but there is no lock waiting for data. If something goes wrong, there must be a demon, so I re-check the code and find the problem.

The container factory is set to listen in batches, but the consumer side accepts the deserialization of the message object by binding @Rabbithandler and automatically processes it, and receives it with an entity object. It is suspected that the message is lost. Combined with the log printing situation, it is likely that a batch of messages are coming, but only one message is received and processed. But it automatically ack, so other messages may be lost.

Based on testing and source code analysis, this was finally confirmed to be the cause.

So I rewrote the code and changed it to manual ACK again (automatic ACK was just a quick way to solve the message backlog problem, but manual ack is more stable for reliable messages)


@Component
public class OperateHotAccountConsume {
    @Autowired
    private HotAccountService hotAccountService;
    @Autowired
    private OperateAccountRemote operateAccountRemote;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "acc_hot_account_queue", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "acc_hot_account_exchange", type = ExchangeTypes.TOPIC),
            key = "acc_hot_account_route_key"), containerFactory = "batchQueueRabbitListenerContainerFactory")
    public void process(List<Message> messages, Channel channel) {
        log.info("OperateHotAccountConsume_process receive messages size:{}", messages.size());

        for (Message message : messages) {
            try {
                handMessage(message, channel);
            } catch (IOException e) {
                log.error("OperateHotAccountConsume_process channel communication exception", e);
            }
        }

    }

    private void handMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        String messageBody = new String(message.getBody());
        log.info("OperateHotAccountConsume_process record message,messageBody:{},deliverTag:{}", messageBody, deliveryTag);
        OperateAccountMsgReq operateAccountMsgReq = null;
        try {
            operateAccountMsgReq = JSON.parseObject(messageBody, OperateAccountMsgReq.class);
            log.info("OperateHotAccountConsume_process record message,operateAccountMsgReq:{}", operateAccountMsgReq);

            HotAccountOperateResultBO resultBO = hotAccountService.executeAsyncAccountOperate(operateAccountMsgReq);

            if (resultBO.isSuccess()) {
                channel.basicAck(deliveryTag, false);
                return;
            }

            if (!Boolean.TRUE.equals(resultBO.getRetry())) {
                log.warn("OperateHotAccountConsume_process result_fail_no_retry,operateAccountReq:{},result:{}", operateAccountMsgReq, resultBO);
                channel.basicAck(deliveryTag, false);
                return;
            }

            // 失败,重试
            log.error("OperateHotAccountConsume_process result_fail_retry,operateAccountReq:{},result:{}", operateAccountMsgReq, resultBO);
            operateAccountRemote.sendOperateAccountMsgDelay(operateAccountMsgReq);
            channel.basicAck(deliveryTag, false);

        } catch (Exception e) {
            log.error("OperateHotAccountConsume_process exception, retry,SendOperateAccountMsgReq:{}", operateAccountMsgReq, e);
            channel.basicNack(deliveryTag, false, true);
            return;
        }

    }

}
Copy the code

Once back online, the problem is resolved and the MQ and Buffer tables are no longer backlogged with messages.

3. The problem is rechecked

RabbitListener (@rabbitListener) contains a containerFactory configuration. The default configuration is used for message listening (single consumption, automatic ack). But after into batchQueueRabbitListenerContainerFactory, changed the message consumption patterns, caused a series of influence, so before the lesson is to modify the code must careful analysis scope (especially the configuration class, influence is big, not easy to find). Before modifying any configuration, find out what it does and what impact it may have. Careful, careful again!! The importance of source code. I also want to make fun of Spring, why the batch message, with a single object to receive the error, such misuse may directly lead to message loss.