preface
That day, I went back to work after dinner with my colleagues. Then someone in the group told me that the merchant of XXX said he could not receive the push. At first, I thought it was nothing. My first reaction was that aurora was not registered, so I asked the customer service to inform the merchant and try to log in again. Here open the background of aurora push to check. I knew it wasn’t easy when more and more people responded that they couldn’t receive their notifications.
The accident after
As a large number of merchants responded that they could not receive the push, my first reaction was whether the push system was suspended, resulting in the failure of push. So I asked the operation and maintenance brother to check the situation of all nodes of the push system and found that they were normal. So turn on the RabbitMQ console and have a look. Tens of thousands of messages are ready and hundreds of messages are unacked.
I thought that the push service was disconnected from MQ, so I could not push messages. So I asked operation and maintenance to restart the push service. After restarting all the push services, I found that all the messages of UNacked became ready.
At that time, I thought there was a network problem, so MQ could not receive ACK, so I asked the operations guy to check and found that the network was ok. Now look is really silly, there is a problem with the network connection can not connect. As it was confirmed that ack could not be caused, the ACK mode was immediately changed from manual to Auto. After upgrading all the nodes, it was found that the push was normal.
You would think this would be the end of it, but it is not. For a long time, there was an MQ service that failed, and since the production was using mirrored queues, the problem MQ was immediately removed from the cluster. Reset the cluster and join the cluster again. That’s the end of the matter. It was close to 24:00.
At 10:00 am the next day, there was an alarm from the operation and maintenance side, saying that there was a machine in the push system, the disk was almost full and the occupancy rate was high. My darling has logged nearly 40 gigabytes since last night. When she reads the error message, she instantly knows what the problem is. Hemp slip to fix the bug urgent release.
Teasing ELK of the company, I didn’t collect this error message at all, so I didn’t find it in time.
Incident reoccurrence – Queue blocked
MQ configuration
spring:
# message queue
rabbitmq:
Host: 10.0.0.53
username: guest
password: guest
virtual-host: local
port: 5672
# message send confirmation
publisher-confirm-type: correlated
# Enable send failure return
publisher-returns: true
listener:
simple:
Minimum number of concurrent requests on the consumer side
concurrency: 1
# Maximum number of concurrent requests on the consumer
max-concurrency: 5
# Number of preprocessed messages in a request
prefetch: 2
# Manual reply
acknowledge-mode: manual
Copy the code
The problem code
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// Decrypt and parse
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// simulate push
pushMsg(orderDto);
}catch (Exception e){
log.error("Push failed - error message :{}, message content :{}", e.getLocalizedMessage(), JSON.toJSONString(orderDto));
}finally {
// The message is received
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
Copy the code
It doesn’t seem to be a problem. As agreed with the trading system, the order data needs to be converted into JSON string first, and then encrypted by AES, so it needs to be decrypted first and then analyzed. To get the order data.
In order to prevent message loss, the trading system made a failure retransmission mechanism to prevent message loss, unfortunately, the order data is not encrypted when retransmission. This leads to the push system, in the decryption of the time out of the exception, thus unable to ack.
Silently ridicule 1: people sit at home, pot from the sky.
To simulate the push
Push the code
Send 3 normal messages
curl http://localhost:8080/sendMsg/3
Copy the code
Send an error message
curl http://localhost:8080/sendErrorMsg/1
Copy the code
Send three more normal messages
curl http://localhost:8080/sendMsg/3
Copy the code
Observe the log sent, although there are errors, but can still be pushed normally. But RabbitMQ has had a message from unacked.
Continue to send 1 error message
curl http://localhost:8080/sendErrorMsg/1
Copy the code
Send three more normal messages
curl http://localhost:8080/sendMsg/3
Copy the code
At this point, you will notice that the console is reporting an error, of course the error message is decryption failure, but the normal message is not consumed, and the queue is actually blocked.
You can also see from the RabbitMQ console that the three messages just sent are in ready state. At this time, if there are always messages coming in, they will accumulate in the queue and cannot be consumed.
Send three more normal messages
curl http://localhost:8080/sendMsg/3
Copy the code
The analysis reason
It says that the queue is blocked because there is no ACK. So the question is, why? This is a protection mechanism for RabbitMQ. Prevent the consumer from crashing due to the flood of messages.
RabbitMQ provides a QOS(quality of Service assurance) capability that limits the maximum number of unconfirmed messages that a consumer on a channel can hold. You can do this by setting PrefetchCount.
For example, you can see that consumer is preceded by a buffer container, and the maximum number of messages that the container can hold is PrefetchCount. RabbitMQ will deliver messages to the container if it is not full, not if it is full. When the consumer ack the message, it removes it and places a new message.
listener:
simple:
Minimum number of concurrent requests on the consumer side
concurrency: 1
# Maximum number of concurrent requests on the consumer
max-concurrency: 5
# Number of messages processed at one time
prefetch: 2
# Manual reply
acknowledge-mode: manual
Copy the code
The prefetch argument is PrefetchCount
According to the above configuration, I found that PREfetch is only configured with 2, and concurrency is only configured with 1. Therefore, when I sent two error messages, the two messages have not been ack due to the decryption failure. Fill the buffer and RabbitMQ will decide that the consumer is no longer able to consume and will stop pushing messages to it, thus blocking the queue.
Determine whether the queue is at risk of blocking.
If ack mode is manual and an unacked message appears online, don’t panic. Since QOS is limited to the maximum unconfirmed quantity that consumers on a channel can maintain. So the number of unacked nodes that are allowed to occur is channelCount * prefetchCount * number of nodes.
Concurrency is the idea behind channlCount.
min
=Concurrency * prefetch * Number of nodes
max
=Max-concurrency * prefetch * Number of nodes
The conclusion can be drawn from this
unacked_msg_count
<min
The queue does not block. But it needs to be dealt with in timeunacked
The news.unacked_msg_count
> =min
There may be a blockage.unacked_msg_count
> =max
The queue must be blocked.
So that’s a little bit of understanding.
Processing method
And the way you do that is very simple, you put decryption and parsing into a try catch so that the message will be accepted regardless of whether the decryption works or not. If there is an error, an error log is output for the developer to deal with.
For this, you need to have a log monitoring system to alert you in time.
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
try {
// Decrypt and parse
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
// simulate push
pushMsg(orderDto);
}catch (Exception e){
log.error("Push failed - error message :{}, message content :{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// The message is received
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
Copy the code
Pay attention to the point
An unacked message automatically returns to the head of the queue when a consumer disconnects.
Incident recurs – Disk usage spikes
At the beginning, I didn’t know there was a problem with the code, but I thought there was no ACK, so I changed the ACK mode to auto, which was an emergency upgrade. In this way, the message would be received whether it was normal or not. Therefore, the problem was really solved at that time.
In fact, in retrospect is very dangerous operation, change the ACK mode to auto, this will make QOS does not take effect. A large number of messages would flood into the consumer, causing the breakdown of the consumer. It could be because at night, there were few transactions and the push system had multiple nodes, so there was no problem.
The problem code
@RabbitListener(queues = ORDER_QUEUE)
public void receiveOrder(@Payload String encryptOrderDto,
@Headers Map<String,Object> headers,
Channel channel) throws Exception {
// Decrypt and parse
String decryptOrderDto = EncryptUtil.decryptByAes(encryptOrderDto);
OrderDto orderDto = JSON.parseObject(decryptOrderDto, OrderDto.class);
try {
// simulate push
pushMsg(orderDto);
}catch (Exception e){
log.error("Push failed - error message :{}, message content :{}", e.getLocalizedMessage(), encryptOrderDto);
}finally {
// The message is received
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false);
}
}
Copy the code
The configuration file
listener:
simple:
Minimum number of concurrent requests on the consumer side
concurrency: 1
# Maximum number of concurrent requests on the consumer
max-concurrency: 5
# Number of messages processed at one time
prefetch: 2
# Manual reply
acknowledge-mode: auto
Copy the code
Since we did not know the retransmission mechanism of the trading system at that time, there was no bug of order data encryption during retransmission, so we still sent a small number of wrong messages.
Send an error message
curl http://localhost:8080/sendErrorMsg/1
Copy the code
why
When the RabbitMQ message listener fails, the Consumer sends basic. Reject to the RabbitMQ server, indicating that the message is rejected. Then the RabbitMQ server redelivers. It’s an infinite loop, which is why the console is frantically flushing error logs causing disk utilization to skyrocket.
The solution
Default – Requeue – Rejected: false
conclusion
- You are advised not to use automatic ACK in the production environment because QOS cannot take effect.
- When using manual ACK, you need to be very careful about message sign-in.
- In fact, when the MQ in question is reset, the error message is cleared so that it is no longer a problem, so the message is lost.
try {
// Business logic.
}catch (Exception e){
// Outputs error logs.
}finally {
// The message is received.
}
Copy the code
The resources
- RabbitMQ message listening exception
The code address
https://gitee.com/huangxunhui/rabbitmq_accdient.git
At the end
Don’t panic if someone tells you you’re in an online accident, unless you’re a battle-hardened super-mogul. Otherwise it’s bullshit. You let him try it, see if his mind goes blank and he sweats.
If you feel helpful to you, you can comment more, more like oh, you can also go to my home page to have a look, maybe there is an article you like, you can also click a concern oh, thank you.