This is the 12th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021
preface
Recently, I found that the data in the database often repeats, and I wanted to locate this problem. The insert operation of the data will only drop when a RocketMQ message is received and processed. I wonder if RocketMQ messages are being sent twice. Pull log analysis.
It is not hard to see that the same messageId was received five times and continued to be sent as time went by. It makes you wonder, under what circumstances does RocketMQ send messages repeatedly?
A scenario where messages are repeated
The message duplication scenario is as follows:
1) Repeated Messages After a message is successfully sent to the server for persistence, the server fails to respond to the client due to intermittent network disconnection or client breakdown. If at this point the producer realizes that the Message failed and tries to send the Message again, the consumer will subsequently receive two messages with the same content and the same Message ID.
2) In the scenario of repeated message consumption during delivery, the message has been delivered to the consumer and business processing has been completed. When the client responds to the server, the network breaks off. To ensure that the Message is consumed at least once, the RocketMQ version of the Message queue will try to deliver the previously processed Message once the network is restored. The consumer will then receive two messages with the same content and the same Message ID.
Message duplication during load balancing (including but not limited to network jitter, Broker restart, and consumer application restart). When the RocketMQ Broker or client restarts, expands or shrinks, it triggers Rebalance, and consumers may receive repeated messages.
So what kind of situation do I fall into? First of all, our service was normal when we received the duplicate message, and RocketMQ was stable. So we get rid of 1, 3 points. In point 2, when the client responds to the server, the network is intermittently disconnected. It means that the client does not respond after processing the service, and the network is intermittently disconnected. Another way to think about it, if the client is doing business and doesn’t respond in a timely manner, what is RocketMQ for? RocketMQ will consider a failed message consumption if it does not receive a response after the specified timeout period. To ensure that the Message is consumed at least once, RocketMQ naturally sends a Message with the same Message ID again. And the problem I have here, it is the business processing time too long, resulting in. The average processing time is around two minutes. The default Message consumption timeout for RocketMQ is 15 minutes, determined by the ConsumeTimeout parameter. You can see that in this code.
/** * Sets the maximum timeout for each message to be consumed. If this timeout is exceeded, the message will be regarded as failed to be consumed again until the next time it is redelivered. Each business needs to set a reasonable value. */ public static Final String ConsumeTimeout = "ConsumeTimeout ";Copy the code
RocketMQ, which we packaged, sets the message consumption timeout to 1 minute by default, causing this problem
@data public static class Customize {/** * message listener class ** @see AbstractMessageListener */ private String listenerClassName = "tech.baoyun.starter.ons.comsume.DefaultMessageListener"; /** * @see com.aliyun.openservices.ons.api.PropertyKeyConst#ConsumeTimeout */ private Integer consumeTimeout = 1; /** * @see com.aliyun.openservices.ons.api.PropertyKeyConst#MaxReconsumeTimes */ private Integer maxReconsumeTimes = 5; /** * @see com.aliyun.openservices.ons.api.PropertyKeyConst#ConsumeThreadNums */ private Integer consumeThreadNums = 10; /** * @see com.aliyun.openservices.ons.api.PropertyKeyConst#SuspendTimeMillis */ private Integer suspendTimeMillis = 60 * 1000; }Copy the code