This is the 13th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

The problem of repeated message consumption

Message double consumption is one of the common problems that can occur in any MQ, and can have serious consequences in sensitive scenarios, such as repeated deductions.

Message repeat consumption scenarios and workarounds

Under what circumstances does repeated consumption of RocketMQ messages occur?

Producer repeat send scenario

When the system call link is long, for example, system A calls system B, and system B sends A message to RocketMQ when system A calls system B.

If system B is successful in processing the call but fails to return the successful call result to System A, System A will attempt to re-initiate the request to system B, causing repeated processing by system B and repeated consumption of RocketMQ by sending multiple messages.

Consumer repeat send scenario

The same problem can occur when system B sends a message to RocketMQ, with the message timed out and system B retries, causing RocketMQ to receive a duplicate message.

Consumer repeat send scenario

When RocketMQ successfully receives the message and sends the message to the consumer for processing, if the consumer fails to submit the offset to RocketMQ before completing the consumption and the consumer is down or rebooted, RocketMQ will consider the consumption failed and send the message to the consumer for consumption again.

Consumers did not immediately return to success

One possible problem with the double-consumption problem: an exception occurs when the consumer consumes the message and does not return the CONSUME_SUCCESS flag.

RocketMQ does a great job of holding messages until they are consumed successfully due to message processing exceptions!

The official comsumerMessage method
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
Copy the code

Whatever you do, do not throw an exception, and if you need to re-consume, return RECONSUME_LATER to request re-consumption.

Catch Exception root Exception to catch exceptions for business processing:

consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
                    logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                    MessagePack msgpack = new MessagePack();
                    for (MessageExt msg : msgs){
                        byte[] data = msg.getBody();
                        try {
                            RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);
                            logger.debug("Receive a message:" + rtmsg);
                            anlysisRTMsgPack(rtmsg, engine);
                        } catch (IOException e) {
                            logger.error("Unpack RTMsg:", e);
                        } catch (Exception e1){
                            logger.warn("Unexcepted exception.", e1);
                        }
                    }
                    logger.debug("RETURN CONSUME SUCCESS.");
                    returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});Copy the code

Problem setting CONSUME_FROM_LAST_OFFSET

When a Consumer makes a purchase, it sets where to start. The default is CONSUME_FROM_LAST_OFFSET, which is set to the value shown in the code.

public enum ConsumeFromWhere {
    /** * A new subscription group is first started to consume from the last position in the queue
    CONSUME_FROM_LAST_OFFSET,
    @Deprecated
   CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    /** * A new subscription group is first started to consume from the top of the queue 

* and then started again following the progress of the last consumption */
CONSUME_FROM_FIRST_OFFSET, / * * * the first time you start a new subscription group from the specified point in time consumption < br > * subsequent restart then the last progress began to consumption point set see DefaultMQPushConsumer. < br > * * / consumeTimestamp parameters CONSUME_FROM_TIMESTAMP, } Copy the code
  • CONSUME_FROM_LAST_OFFSET: consumes from the last offset of the consumer.

    • If it is a new consumer, it is determined by the consumer group to which the client belongs.

    • If you belong to a consumer group that is newly launched, subscribing to messages, none of the earliest messages have passed, the designers of RocketMQ believe that you are a newly launched business and will force consumption from the first message.

    • If the subscribed messages have already generated expired messages, they will be consumed from the point at which our client started.

The ConsumeFromWhere parameter is valid only when a new consumer is first started

  • CONSUME_FROM_FIRST_OFFSET: consume from the minimum offset,

  • CONSUME_FROM_TIMESTAMP: consumption started at a certain time.

  • Determining whether or not a new ConsumerGroup is created is done at the broker side.

  • The offset to which the Consumer consumes first exists locally, and the timing synchronizes with the broker its own consumption offset.

  • The broker determines whether a new ConsumerGroup is created by checking whether the broker has an offset for the consumergroup.

The offset is invalidated

For a new queue, this argument is also useless, and consumption starts at zero.

So, there is a question that I have already set CONSUME_FROM_LAST_OFFSET, why do I repeat the consumption? Maybe you are not the new ConsumerGroup, or maybe it is a new Queue.

Retry queue and dead letter queue

  • The consumer side has not returned the result of consumption. RocketMQ assumes that the message was not received, and the next time the consumer pulls, the broker will still send the message.

  • Any exception to capture returns: ConsumeConcurrentlyStatus RECONSUME_LATER

RocketMQ is put into the RETRY queue, and TOPIC is: %RETRY%+ the name of the COnsumerGroup

  • The retried message is reposted to the ConsumerGroup after a delay of some point (default is 10 seconds, business configurable).

  • However, if repeated consumption continues to fail to a certain number of times (default 16 times), it will be delivered to the DLQ dead-letter queue, and manual intervention is needed at this time.

/**
Batch consumption size
*/

private int consumeMessageBatchMaxSize = 1;

/** Batch pull size */

private int pullBatchSize = 32;
Copy the code
  • ConsumeMessageBatchMaxSize is the maximum number of article mass consumption

  • PullBatchSize is the maximum number of strips to be pulled at a time

To the broker

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

The parameter is to set the retry time, that is, 1s after the first retry and 5s after the second retry

Do not change the production environment

messageDelayLevel = 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s 5s
Copy the code

Sixteen times later, a topic was added: %DLQ%+ ConsumerGroup

The default is 16 times and can be changed, but only with DefaultMQPullConsumer.

DefaultMQPushConsumer cannot modify this value.

ConsumeMessageBatchMaxSize this size is consumers registered callback listener to deal with the number of messages at a time, the default is 1, the number of messages every time not pull (default is 32), the don’t confuse.

Updates to message consumption progress

Future articles will cover the functionality and analysis of progress updates