“This is the second day of my participation in the First Challenge 2022.
preface
- Earlier we talked about how to ensure rabbitMQ messages are not lost. They were analyzed from three perspectives. These are sender, RabbitMQ, and consumer.
- At that time, the consumer was simply introduced. Today we will analyze the pit brought by consumer confirmation consumption from a usage scenario
Send a message
- Let’s stick with the sending logic we used before.
public Map<String, Object> sendMessage(Map<String, Object> params) throws UnsupportedEncodingException { Map<String, Object> resultMap = new HashMap<String, Object>(){ { put("code", 200); }}; String msg = ""; Integer index = 0; if (params.containsKey("msg")) { msg = params.get("msg").toString(); } if (params.containsKey("index")) { index = Integer.valueOf(params.get("index").toString()); } if (index ! = 0) {// Start simulating an exception. The message will be lost int I = 1/0; } Map<String, Object> map = new HashMap<>(); map.put("msg", msg); Message message= MessageBuilder.withBody(JSON.toJSONString(map).getBytes("UTF-8")).setContentType(MessageProperties.CONTENT_TYPE_JSON) .build(); CorrelationData data = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(RabbitConfig.TOPICEXCHANGE, "zxh", message,data); return resultMap; }Copy the code
- First of all, we will keep the exception in the send. This is to test the operation of sending message confirmation. I stole a lazy here did not delete. This time we will always ensure the accuracy of message delivery in the invocation interface. Because our focus is on the consumer
- Because RabbitMQ has three validation mechanisms
acknowledge-mode
; The values are manual, auto, and None. “Manual” indicates manual confirmation, “auto” indicates automatic confirmation, and “None” indicates inaction
@RabbitListener(queues = RabbitConfig.QUEUEFIRST) @Async("asyncExecutor") public void handler(Message msg, Channel channel) { //channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true); byte[] body = msg.getBody(); String messages = new String(body); JSONObject json = (JSONObject) JSONObject.parse(messages); if ("1".equals(json.getString("msg"))) { try { channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); }} if ("2".equals(json.getString(" MSG ")) {throw new RuntimeException(" exception "). ); } log.info(rabbitconfig. QUEUEFIRST+" queue consumption: "+ MSG); }Copy the code
- In terms of receiving the message, we processed the message according to the sent message. If the body of the message is 1, we will confirm the message. If the body of the message is 2, we will throw an exception, that is, we will not confirm the message. Rabbitmq will save the message and try to send it to the consumer again if we don’t confirm it.
Scene description
-
I got a phone call late at night saying there was a difference in the online data. Above we are talking about the consumption of MQ, and the intelligent reader will know that the problem is definitely the consumption of MQ. And for me it was boring at first. It has been three days since the project was launched. Why did the function fail at this time?
-
So I opened the project and started to locate online. First I looked at the log and realized that the logic of the project handling MQ was reporting crazy errors. I was glad it was so easy to find the problem. However, as the problem is further discovered, the error report is not the root cause of the online failure. Because what happens online is that data doesn’t sync. The key to synchronization is to listen for MQ messages to synchronize. However, the problem is that the message cannot be received at all through the log, and the relevant queue is blocked in the MQ background page.
-
Here’s a quick summary:
- Mq processing logic is reporting crazy errors
- Mq cannot accept other data
The problem analysis
- Since this happened three days after launch, I’m pretty sure the business logic was right, otherwise it wouldn’t have passed the test at all. So what I have to think about is why is the queue jammed? This launch did include a slight change to MQ, namely the addition of manual confirmation of messages.
Unexpected exception occurred invoking async method: public void xxxxxxxxxxxxxxxx(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel)
java.lang.IllegalStateException: Channel closed; cannot ack/nack
Copy the code
-
This is one of the errors reported online, the other error reported in the business has nothing to do with us. Through the above error message we can extract the following
- async exception
- channel cloesed , cannot ack/nack
-
One is an async exception caused by an exception in the asynchronous processing. Because of an asynchronous processing error a not ACK appears when an acknowledgement is reached
-
Here is a blogger’s solution to not ACK, which is not applicable to our scenario
-
We get an exception when we send MSG =2. This is actually quite easy to deal with, making a global catch exception compatible with where MQ receives the message.
-
At this point we are just dealing with the first problem above —- MQ handling crazy error; Clearly this does not solve our fundamental problem – why are MQ messages clogged
Mq batch Settings
With Acknowledge enabled, consumers can acknowledge received messages asynchronously based on service requirements.
In practice, however, consumers with limited processing power will expect RabbitMQ to stop pushing messages from the queue after receiving a certain number of messages from rabbitMQ, and to receive messages from the queue when they have ack the messages and are able to process more messages. In this scenario, we can achieve this effect by setting preFETch_count in basic.qos signaling
- We can see that the default preFETch_count for MQ is 250. This setting is also the root cause of our MQ congestion.
- This means that when we encounter an error resulting in no valid message acknowledgement, the channel between us consumers and MQ will occupy a message. Knowing that 250 pieces of data were smeared with unverifiable messages, normal data was no longer posted to the consumer. Because preFETch_count is the upper limit for the consumer.
- This creates an endless loop. 250 messages we can’t confirm can’t be consumed, we can’t get new messages.
- That’s why the system didn’t have a problem when it first went live, because at first we could just accept that the message wasn’t acknowledged and it didn’t affect our ability to process the correct data but it was a little slower. And as time goes on we end up with more and more false data.
The solution
- Handle the exception business and catch the exception guarantee message acknowledgement
- It is best to use automatic acknowledgment or put message acknowledgment first.
conclusion
- When RabbitMQ wants to deliver a message from a queue to a consumer, it traverses the list of consumers on the queue, selects an appropriate consumer, and delivers the message. A criterion for selecting a consumer is to check whether the number of unack messages on the channel corresponding to the consumer reaches the preset preFETch_count number. If the number of unack messages reaches the preset preFETch_count number, the requirement is not met. When the appropriate consumer is selected, the subsequent traversal is interrupted